Reactor의 핵심요소
1. Stream
1) publisher
2) subscriber
3) subscription
4) processor
subscriber는 subscribe를 통해 publisher에게 요청을 하게되고, 이 과정에서 subscription이라는 객체로 publisher와 연결.
subscriber는 구독하여 요청하거나 요청 취소할 수 있고, publisher는 데이터를 제공하거나, 오류처리, 종료시킬 수 있다.
processor는 publisher와 subscriber를 상속받는 인터페이스이다.
2. asynchronous
데이터 처리 중 같은 쓰레드 혹은 다른 쓰레드를 사용하거나 병렬 처리할 수 있다.
3. back pressure
처리속도 불균형이 있을 경우 불균형을 처리하는 장치로,
publisher가 제공하는 데이터를 subscriber가 처리할 수 있는지 확인할 수 있는 장치.
Project Reactor
여러 구현체 중에서 Spring Webflux에서 사용하는 것은 Project Reactor로서, VMWare에서 관리하는 오픈소스이다.
한 개 이상의 데이터를 의미하는 flux, 단일 데이터 혹은 데이터 없음을 표현하는 Mono로 표현한다.
실제로 퍼블리셔를 하나 구현한 후, Flux나 Mono하나를 구현하여 메인 메소드에서 간단히 로그를 찍어보면
아래와 같이 작성되는 로그를 볼 수 있다.
subscriber가 publisher를 subscribe하면 onSubscribe()가 퍼블리셔로부터 반환되며
subscriber가 request (unbounded, 제한없이) 요청을 하면,
publisher는 onNext()로 하나 하나 보내면서 마지막에 onComplete()가 일어난다.
만약 도중 에러가 발생하는 경우 onError()가 발생한다.
Operator
스트림 데이터를 어떻게 Operate 할 것인가!
map
스트림 데이터를 1:1로 Transformation하는 역할 ( ex, i -> i*2 )
public Flux<Integer> fluxMap() {
return Flux.range(1, 5)
.map(i -> i*2).log();
}
Operator1 operator1 = new Operator1();
@Test
void fluxMap() {
StepVerifier.create(operator1.fluxMap())
.expectNext(2, 4, 6, 8, 10)
.verifyComplete();
}
filter
조건에 따라 필터링하는 Operator
public Flux<Integer> fluxFilter() {
return Flux.range(1, 10)
.filter(i -> i > 5)
.log();
}
@Test
void fluxFilter() {
StepVerifier.create(operator1.fluxFilter())
.expectNext(6, 7, 8,9, 10)
.verifyComplete();
}
take
스트림 데이터 중 일부만 취하는 Operator
public Flux<Integer> fluxTake() {
return Flux.range(1, 10)
.filter(i -> i > 5)
.take(3)
.log();
}
@Test
void fluxTake() {
StepVerifier.create(operator1.fluxTake())
.expectNext(6, 7, 8)
.verifyComplete();
}
flatMap
#1개의 item을 1개 이상의 새로운 stream으로 변환하는 1:N Operator
# I/O에 대해서 비동기 처리 (DB, 서버 API 요청 등)
public Flux<Integer> fluxFlatMap() {
return Flux.range(1, 10)
.flatMap(i -> Flux.range(i*10,10)
.delayElements(Duration.ofMillis(100)))
.log();
}
@Test
void fluxFlapMap() {
StepVerifier.create(operator1.fluxFlatMap())
.expectNextCount(100)
.verifyComplete();
}
concatMap
- 다음 요소를 처리하기 전에 이전 요소가 완료될때까지 기다린다.
- 순서가 중요한 작업에서 flatMap대신 concatMap을 사용
public Flux<Integer> fluxConcatMap() {
return Flux.range(1, 10)
.concatMap(i -> Flux.range(i*10, 10))
.delayElements(Duration.ofMillis(100))
.log();
}
@Test
void fluxConcatMap() {
StepVerifier.create(operator2.fluxConcatMap())
.expectNextCount(100)
.verifyComplete();
}
flatMapMany
- Mono의 단일 아이템에서 Flux로 변환할 때 사용
public Flux<Integer> monoFlatMapMany() {
return Mono.just(10)
.flatMapMany(i -> Flux.range(1, i)).log();
}
@Test
void monoFlatMapMany() {
StepVerifier.create(operator2.monoFlatMapMany())
.expectNextCount(10)
.verifyComplete();
}
switchIfEmpty / defaultIfEmpty
- 전달받은 값이 Empty인 경우에 대한 로직을 선택할 수 있도록 돕는 연산자
public Mono<Integer> defaultIfEmpty() {
return Mono.just(100)
.filter(i -> i > 100)
.defaultIfEmpty(30);
}
public Mono<Integer> switchIfEmpty() {
return Mono.just(100)
.filter(i -> i > 100)
// .switchIfEmpty(Mono.just(30).map(i-> i*2));
.switchIfEmpty(Mono.error(new Exception("Not exists value..."))).log();
}
@Test
void defaultIfEmpty() {
StepVerifier.create(operator2.defaultIfEmpty())
.expectNext(30)
.verifyComplete();
}
@Test
void switchIfEmpty() {
StepVerifier.create(operator2.switchIfEmpty())
.expectNext(60)
.verifyComplete();
}
merge / zip
- 여러개의 stream을 하나로 모아주는 연산자
public Flux<String> fluxMerge() {
return Flux.merge(Flux.fromIterable(List.of("1", "10", "12")), Flux.just("4"))
.log();
}
public Flux<String> monoMerge() {
return Mono.just("1").mergeWith(Mono.just("2")).mergeWith(Mono.just("3")).log();
}
public Flux<String> fluxZip() {
//(a,d), (b,e), (c,f)
return Flux.zip(Flux.just("a", "b", "c"), Flux.just("d", "e", "f"))
.map(i->i.getT1() + i.getT2()).log();
}
public Mono<Integer> monoZip() {
return Mono.zip(Mono.just(1), Mono.just(2), Mono.just(3))
.map(i -> i.getT1() + i.getT2() + i.getT3());
}
@Test
void fluxMerge() {
StepVerifier.create(operator2.fluxMerge())
.expectNext("1", "10", "12", "4")
.verifyComplete();
}
@Test
void monoMerge() {
StepVerifier.create(operator2.monoMerge())
.expectNext("1", "2", "3")
.verifyComplete();
}
@Test
void fluxZip() {
StepVerifier.create(operator2.fluxZip())
.expectNext("ad", "be", "cf")
.verifyComplete();
}
@Test
void monoZip() {
StepVerifier.create(operator2.monoZip())
.expectNext(6)
.verifyComplete();
}
- count / distinct / reduce / groupby (집계연산)
count : 여러 스트림의 데이터들을 한대 모아 몇 개인지 집계
public Mono<Long> fluxCount(){
return Flux.range(1, 10).count();
}
@Test
void fluxCount() {
StepVerifier.create(operator3.fluxCount())
.expectNext(10L)
.verifyComplete();
}
distinct : 중복된 것들을 지워주는 연산자
public Flux<String> fluxDistinct() {
return Flux.fromIterable(List.of("a", "b", "a", "c", "a"))
.distinct();
}
@Test
void fluxDistinct() {
StepVerifier.create(operator3.fluxDistinct())
.expectNext("a", "b", "c")
.verifyComplete();
}
reduce : 배열 형태의 여러 값들을 연산을 통해 하나로 줄여주는 연산자
public Mono<Integer> fluxReduce() {
return Flux.range(1, 10)
.reduce((i, j) -> i+j);
}
@Test
void fluxReduce() {
StepVerifier.create(operator3.fluxReduce())
.expectNext(55)
.verifyComplete();
}
groupby : 동일한 값에 대해 혹은 특정 값들을 묶어서 처리하는 연산자
public Flux<Integer> fluxGroupBy() {
return Flux.range(1, 10)
.groupBy(i -> i % 2 == 0 ? "even" : "odd")
.flatMap(group -> group.reduce((i, j) -> i+j)).log();
}
@Test
void fluxGroupBy() {
StepVerifier.create(operator3.fluxGroupBy())
.expectNext(30, 25)
.verifyComplete();
}
delaySequence : 딜레이 시킴
limitRate : 백프레셔의 리퀘스트를 조절하도록 하는 기능.
public Flux<Integer> fluxDelayAndList() {
return Flux.range(1, 10)
.delaySequence(Duration.ofSeconds(1))
.log()
.limitRate(2);
}
@Test
void fluxDelayAndList() {
StepVerifier.create(operator4.fluxDelayAndList())
.expectNext(1,2,3,4,5,6,7,8,9,10)
.verifyComplete();
}
sample : 일부 샘플링 해서 요청하는 기능
public Flux<Integer> fluxSample() {
return Flux.range(1, 100)
.delayElements(Duration.ofMillis(100))
.sample(Duration.ofMillis(300))
.log();
}
@Test
void fluxSample() {
StepVerifier.create(operator4.fluxSample())
.expectNextCount(1000)
.verifyComplete();
}
'Spring Boot(JAVA)' 카테고리의 다른 글
6. Reactive - Reactor Schedulers (0) | 2024.04.05 |
---|---|
4. Reactive - Spring MVC VS Webflux (0) | 2024.04.04 |
3. Reactive - Netty (0) | 2024.04.04 |
2. Reactive - CompletableFuture (0) | 2024.04.04 |
1. Reactive - 함수형 인터페이스 / 블로킹 & 논블로킹 (0) | 2024.04.04 |