| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | 7 |
| 8 | 9 | 10 | 11 | 12 | 13 | 14 |
| 15 | 16 | 17 | 18 | 19 | 20 | 21 |
| 22 | 23 | 24 | 25 | 26 | 27 | 28 |
- AWS
- NGINX
- 본인인증
- 컨퍼런스
- 보안
- 딥시크
- 로그인
- AOP
- Spring
- ktlin
- postgis
- Mono
- Spring Boot
- 코틀린
- IntelliJ
- 허깅 페이스
- Kotlin
- docker
- PostgreSQL
- webflux
- API
- Flux
- 본인확인
- spring security
- exception
- 공동인증서
- deepseek vs chatgpt
- netty
- db
- 인증
- Today
- Total
[수미수의 개발 브로구]
[WebFlux] Flux 병합 처리 및 API 병렬 처리 활용 본문
Flux 병합 처리
Flux 사용 시 병합 처리는 여러 스트림을 동시에 처리하는 각각의 스트림에서 생성된 데이터를 결합하여 하나의 스트림으로 방출 하는데 사용된다. Flux 에서는 여러 병합 연산자가 있으며, merge(), mergeSequential(), concat(), zip() 등을 사용 할 수 있다.
merge()
merge() 는 여러 스트림을 병렬로 처리하여 즉시 데이터를 방출 하며, 각 스트림의 데이터를 순서에 상관 없이 결합한다. 여러 스트림의 데이터를 빠르게 처리하고 순서가 중요하지 않을 때 사용 한다.
Merge data from Publisher sequences emitted by the passed Publisher into an interleaved merged sequence. Unlike concat, inner sources are subscribed to eagerly.

아래 예제는 두 개의 Flux 신호에 대해서 순서에 상관없이 즉시 방출 하며, 데이터의 순서는 스트림의 방출 시간에 따라 달라 질 수 있다.
import reactor.core.publisher.Flux;
public class FluxMergeExample {
public static void main(String[] args) {
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(100));
Flux<String> flux2 = Flux.just("D", "E", "F").delayElements(Duration.ofMillis(50));
Flux.merge(flux1, flux2)
.subscribe(System.out::println);
// 출력: D E F A B C (순서가 달라질 수 있음)
}
}
mergeSequential()
mergeSequential() 은 여러 스트림을 병렬로 처리 하지만, 각 스트림의 데이터를 순서데로 방출 하며, 첫번째 스트림의 데이터가 모두 방출 된 후 두 번째 스트림의 데이터가 방출 된다. 여러 스트림을 병렬로 실행 하면서도 결괄르 순차적으로 처리 할 때 사용 된다.
Merge data from Publisher sequences provided in an array/vararg into an ordered merged sequence. Unlike concat, sources are subscribed to eagerly. Unlike merge, their emitted values are merged into the final sequence in subscription order.

아래 예제 코드는 병렬로 처리 되지만, flux1의 모든 데이터가 출력 된 후 flux2 의 데이터가 출력 된다.
import reactor.core.publisher.Flux;
public class FluxMergeSequentialExample {
public static void main(String[] args) {
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(100));
Flux<String> flux2 = Flux.just("D", "E", "F").delayElements(Duration.ofMillis(50));
Flux.mergeSequential(flux1, flux2)
.subscribe(System.out::println);
// 출력: A B C D E F
}
}
concat()
concat() 은 여러 스트림을 순차적으로 이어 붙이며, 첫 번째 스트림이 끝난 후 두번째 스트림을 이어서 처리 하며, 각각의 스트림이 끝나면 다음 스트림을 시작 한다. 스트림들이 하나의 스트림으로 결합 되며, 각 스트림의 요소들이 순차적으로 출력 된다. mergeSequential() 과 비슷 하지만, 병렬로 처리되지 않고 순차적으로 데이터를 결합한다. 데이터의 순서를 보장하면서 순차적으로 데이터를 처리 할 때 사용 된다.
Concatenate all sources provided in an Iterable, forwarding elements emitted by the sources downstream.
Concatenation is achieved by sequentially subscribing to the first source then waiting for it to complete before subscribing to the next, and so on until the last source completes. Any error interrupts the sequence immediately and is forwarded downstream.

import reactor.core.publisher.Flux;
public class FluxConcatExample {
public static void main(String[] args) {
Flux<String> flux1 = Flux.just("A", "B", "C").delayElements(Duration.ofMillis(100));
Flux<String> flux2 = Flux.just("D", "E", "F").delayElements(Duration.ofMillis(50));
Flux.concat(flux1, flux2)
.subscribe(System.out::println);
// 출력: A B C D E F
}
}
zip()
zip() 은 여러 스트림을 병렬로 결합하여 각 스트림의 동일한 순서에 있는 요소들을 결합한다. 각 스트림의 같은 인덱스에 위치한 요소들을 결합하며, 스트림 중 하나라도 완료 되면 그 시점에서 결합을 멈춘다. 결과 값은 각 스트림의 각 요소들이 튜플 형식으로 결합되어 방출 된다
Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into an output value (constructed by the provided combinator). The operator will continue doing so until any of the sources completes. Errors will immediately be forwarded. This "Step-Merge" processing is especially useful in Scatter-Gather scenarios.

import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;
public class FluxZipExample {
public static void main(String[] args) {
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("1", "2", "3");
Flux.zip(flux1, flux2)
.subscribe(tuple -> {
String element1 = tuple.getT1(); // flux1의 요소
String element2 = tuple.getT2(); // flux2의 요소
System.out.println(element1 + " - " + element2);
});
// 출력:
// A - 1
// B - 2
// C - 3
}
}
Flux 병합 비교
| merge() | mergeSequential() | concat() | zip() | |
| 처리 방식 | 병렬 처리 | 병렬 처리 | 순차 처리 | 병렬 처리 |
| 결합 규칙 | 순서를 보장 안함 | 스트림 순서에 맞게 결합 | 첫 번째 스트림이 끝나면 두 번째 스트림 실행 | 각 스트림의 요소가 동일한 순서에서 결합 |
| API 활용 예 | API 호출 간 순서가 중요하지 않으며, 빠른 데이터를 처리하고 싶을 때 사용 | API 호출은 병렬로 실행하고, 각 스트림의 데이터의 순서가 중요한 경우 | API 호출간 순차적 의존성 있을 때 사용 가능 | 동시에 여러 API 를 비동기로 병렬 처리 후 응답을 하나의 결과로 합칠 때 사용 가능 |
API 활용
실제 업무에서는 Flux 의 병합 연사자를 API 처리에서 활용 할 수 있다. 각각의 연사자의 동작 방식이 다르기 때문에 비지니스에 맞게 처리 리 할 수 있도록 한다.
merge() 이용한 API 처리
아래 샘플은 merge() 를 이용한 API 호출이며, 첫번째, 두번째 API가 병렬로 호출 되며, 각 API 의 데이터가 방출되는 즉시 처리 되며, 데이터의 순서는 보장되지 않는다.
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Flux
class ApiService {
private val webClient: WebClient = WebClient.create()
fun callApi1(): Flux<String> {
return webClient.get()
.uri("https://endpoint1")
.retrieve()
.bodyToFlux(String::class.java)
}
fun callApi2(): Flux<String> {
return webClient.get()
.uri("https://endpoint2")
.retrieve()
.bodyToFlux(String::class.java)
}
fun mergeApis(): Flux<String> {
return Flux.merge(callApi1(), callApi2())
}
}
mergeSequential() 이용한 API 처리
아래 샘플은 mergeSequential() 를 이용한 API 호출이며, API 호출은 병렬로 진행 되지만, 첫 번째 API의 모든 데이터가 처리된 후 두 번째 API의 데이터가 순차적으로 처리 된다.
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Flux
class ApiService {
private val webClient: WebClient = WebClient.create()
fun callApi1(): Flux<String> {
return webClient.get()
.uri("https://endpoint1")
.retrieve()
.bodyToFlux(String::class.java)
}
fun callApi2(): Flux<String> {
return webClient.get()
.uri("https://endpoint2")
.retrieve()
.bodyToFlux(String::class.java)
}
fun mergeSequentialApis(): Flux<String> {
return Flux.mergeSequential(callApi1(), callApi2())
}
}
concat 을 이용한 API 처리
아래 예제 코드는 concat 을 이용하여, 여러 개의 API 를 순차적으로 호출 하는 예제 코드이다. concat 은 순차적으로 호출 하기 때문에, API 간 순서가 중요한 경우에 적합 하다.
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ApiService {
private final WebClient webClient = WebClient.create();
public Mono<String> callApi1() {
return webClient.get()
.uri("https://endpoint1")
.retrieve()
.bodyToMono(String.class);
}
public Mono<String> callApi2() {
return webClient.get()
.uri("https://endpoint2")
.retrieve()
.bodyToMono(String.class);
}
public Mono<String> callMultipleApisSequentially() {
return Flux.concat(callApi1(), callApi2())
.reduce((result1, result2) ->
"Result from API 1: " + result1 + ", Result from API 2: " + result2
);
}
}
zip 을 이용한 API 병렬 처리
아래 샘플은 zip() 를 이용한 API 호출이며, 두 API 가 병렬로 호출되며, 각 API 호출 결과를 결합하여 하나의 결과로 반환 한다. 두 API 호출의 결과가 모두 준비되어야 결합이 이루어진다.
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
public class ApiService {
private final WebClient webClient = WebClient.create();
public Mono<String> callApi1() {
return webClient.get()
.uri("https://endpoint1")
.retrieve()
.bodyToMono(String.class);
}
public Mono<String> callApi2() {
return webClient.get()
.uri("https://endpoint2")
.retrieve()
.bodyToMono(String.class);
}
public Mono<String> callMultipleApis() {
return Mono.zip(callApi1(), callApi2())
.map(tuple -> {
String result1 = tuple.getT1();
String result2 = tuple.getT2();
return "Result from API 1: " + result1 + ", Result from API 2: " + result2;
});
}
}
결론
WebFlux 에서 API 처리 방법을 고민 하면서, Flux 의 merge(), mergeSequential(), concat(), zip() 과 같은 연산자들을 확인 하였다. 이러한 연산자들은 병렬처리 또는 순차 처리 등 각 특징을 가지고 있으며, 업무에 맞게 적절한 연산자를 사용해서 API 를 개발 하면 상황에 맞춰 최적의 성능을 발휘 할 수 있다.
참고 문헌
'Language & Framework > WebFlux' 카테고리의 다른 글
| [WebFlux] Coroutine 과 Reactor Stream (3) | 2024.10.02 |
|---|---|
| [WebFlux] WebFlux Custom Exception 생성하기 (1) | 2023.09.26 |
| [WebFlux] WebFlux Exception 처리 (0) | 2023.09.21 |
| [WebFlux] WebFlux Server Request 에 Custom Header 추가하기 (0) | 2023.09.15 |
| [WebFlux] Spring WebFlux API 서버 만들기 (0) | 2023.08.27 |