Core Mono / Flux Operators - Building Spring Boot WebFlux API "reactive-user-service" (Reactive Data Pipeline )
- coding z2m
- Jul 1
- 2 min read
Covers: map(), filter(), flatMap(), zip(), combineLatest(), timeout()
These operators help:
Transform data (map, flatMap)
Filter streams (filter)
Merge/combine multiple streams (zip, combineLatest)
Handle performance concerns (timeout) 🔹 1. map() – Transform Each Item
📦 Transforms items one-to-one (sync transform)
🧾 Example: Mono<String> mono = Mono.just("john");
mono.map(String::toUpperCase)
.subscribe(System.out::println); // Output: JOHN Flux<Integer> numbers = Flux.just(1, 2, 3);
numbers.map(n -> n * 10)
.subscribe(System.out::println); // 10, 20, 30 ✅ Use when: You want to transform individual items 🔹 2. filter() – Remove Unwanted Items
🧹 Keeps only matching values
🧾 Example: Flux<String> fruits = Flux.just("Apple", "Banana", "Mango", "Avocado");
fruits.filter(f -> f.startsWith("A"))
.subscribe(System.out::println); // Apple, Avocado ✅ Use when: You want to filter items based on a condition. 🔹 3. flatMap() – Transform into New Reactive Types
🔄 Converts one item → reactive stream (Mono or Flux)
💡 Think of it like .map() + .flatten()
🧾 Mono + flatMap: Mono<String> mono = Mono.just("user123");
mono.flatMap(id -> getUserDetails(id)) // returns Mono<User>
.subscribe(user -> System.out.println(user.getName())); public Mono<User> getUserDetails(String id) {
return Mono.just(new User(id, "John"));
} ⚠️ Use flatMap when:
Your mapping function returns Mono or Flux
You’re making async calls (DB/API)
Don’t use map() with Mono-returning methods 🔹 4. zip() – Combine Items from Multiple Publishers
🤝 Combine 2+ reactive streams by position
🧾 Example: Mono<String> firstName = Mono.just("John");
Mono<String> lastName = Mono.just("Doe");
Mono<String> fullName = Mono.zip(firstName, lastName)
.map(tuple -> tuple.getT1() + " " + tuple.getT2());
fullName.subscribe(System.out::println); // Output: John Doe For Flux, it merges element-wise: Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<String> flux2 = Flux.just("A", "B", "C");
Flux<String> result = Flux.zip(flux1, flux2, (num, str) -> num + "-" + str);
result.subscribe(System.out::println); // 1-A, 2-B, 3-C 🔹 5. combineLatest() – Combine Latest Values
⚡ Combines most recent values from multiple sources
Flux<String> colors = Flux.just("Red", "Green", "Blue").delayElements(Duration.ofMillis(100));
Flux<String> shapes = Flux.just("Circle", "Square").delayElements(Duration.ofMillis(150));
Flux.combineLatest(colors, shapes, (c, s) -> c + " " + s)
.subscribe(System.out::println); ✅ Useful for UI, dashboard, streaming feeds 🔹 6. timeout() – Timeout if No Item Arrives in Time
⏱ Cancel & throw error if nothing arrives in X ms
🧾 Example: Flux<String> delayedFlux = Flux.just("A", "B", "C")
.delayElements(Duration.ofSeconds(2));
delayedFlux
.timeout(Duration.ofSeconds(1)) // throws TimeoutException
.subscribe(System.out::println, System.err::println); ✅ Use when:
You expect a stream to respond quickly
You need to protect downstream systems ✅ Summary Table
Operator | Use Case | Returns |
map() | Sync transform item → item | Mono / Flux |
filter() | Filter values by condition | Flux |
flatMap() | Async transform item → Mono/Flux | Mono / Flux |
zip() | Combine values by index | Mono / Flux |
combineLatest() | Combine latest from multiple streams | Flux |
timeout() | Fail if no item within time limit | Mono / Flux |
Let’s now build a small, real-world reactive REST API using: Goal: Create a Spring Boot WebFlux API that fetches user details, enriches with address, filters active users only, and returns a UserResponse DTO — using Mono/Flux and core operators like map(), flatMap(), filter(), zip()
Comments