본문 바로가기
Spring Boot(JAVA)

2. Reactive - CompletableFuture

by #Glacier 2024. 4. 4.
반응형

CompletableFuture는 Java 8에서 비동기 프로그래밍을 지원코자 도입되었다.

Lambda, Method Reference 등 Java 8의 새로운 기능을 지원한다.

 

1. Method Reference

"::" 연산자를 통해 함수에 대한 참조를 간결하게 표현한다.

 

1) Method Reference

객체 메소드에 참조

2) Static Method Reference

스태틱 메소드에 참조

3) Instance Method Reference

클래스가 가지고 있는 인스턴스에 대한 참조 

4) Constructor Method Reference

생성자 메소드의 참조

 

즉 메소드의 레퍼런스는 보통 참조의 표현보다 조금 더 간결하게 표현할 수 있는 표현식이다.

 

2. CompletableFuture Class

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

 

CompletableFuture는 Future와 CompletionStage로 구현되어

 

1) Future

- 비동기적 작업 수행 후 작업 결과를 반환하는 인터페이스

 

2) CompletionStage

- 비동기 작업 수행, 결과 처리 혹은 다른 CompletionStage를 연결하는 인터페이스

 

이는 Reactive 개발 외적으로 Kafka쪽에서도 봤듯이 비동기 처리후 Future를 반환하며

onCompletion()을 통해 콜백함수를 재정의하여 사용했었던 것과 같다.

Future는 외부에서 Future를 컨트롤 하기가 어렵다 (cancle메소드를 빼고).

또, .get()을 하는 경우계속 기다리기 때문에 비동기적 처리가 어렵다.

isDone() 메소드는 중간에 실행되다 중단되어 취소되었음에도 isDone() 메서드가 true를 반환하기 때문에Exception을 명확하게 처리하기가 어렵다.

 

ExecutorService

- 쓰레드 풀을 이용하여 비동기적으로 작업 을 실행하고 관리, 효율적 자원관리

- 별도의 쓰레드를 생성하고 관리하지 않아도 되므로, 코드를 간결하게 유지

 

1) newSigleThreadExecutor : 단일 쓰레드로 구성된 쓰레드 풀을 생성하여 한 번에 하나의 작업만 실행

2) newFixedThreadPool : 고정된 크기의 쓰레드 풀을 생성, 크기는 인자로 주어짐

3) newCachedThreadPool : 사용 가능한 쓰레드가 없으면 생성, 있으면 재사용, 놀면 회수

4) newScheduledThreadPool : 스케줄링 기능이 있는 쓰레드 풀, 주기적, 지연이 발생하는 작업 실행

5) newWorkStealingPool: work Steal 알고리즘을 사용하는 ForkJoinPool 생성

 

CompletionStage는 많은 연산자를 제공하며 비동기 task를 실행, 값 변형 등 연산자 체이닝을 조합할 수 있다.

또한 에러처리를 위한 콜백이 제공된다.

 

비동기 처리시에는 다른 Thread가 필요하고, CompletionStage는 ForkJoinPool을 사용한다.

 

ForkJoinPool

- 기본 Pool Size는 할당 Cpu코어 - 1개

- 하나의 Task를 subtask로 나누어 steal work 알고리즘을 이용해 쓰레드 풀 내의 쓰레드와 균등하게 처리

- join을 통해 조각 조각 결과를 생성

- Daemon Thread로 Main Thread가 종료되면 바로 종료됨

 

CompletionStage의 연산자

 

1) thenAccept(Async)

- Consumer를 파라미터로 받아 다음 task에게 null을 전달.

- 값을 받아 action을 수행하는 경우에 사용

 -completionStage에서 -> thenAccept -> isDone(True) -> caller Thread에서 action 실행

                                     -> thenAccept -> isDone(false) -> callee Thread에서 action 실행

                                     -> thenAcceptAsync -> ThreadPool에 있는 Thread에서 action 실행

 

2) thenApply(Async)

- Function을 파라미터로 받아 이전 task로부터 T 타입의 값을 받아 가공하여 U 타입의 값을 반환한다.

- 다음 Task에게 반환했던 값이 전달되며 값을 변형하여 전달할 때 사용

 

3) thenCompose(Async)

- Function을 파라미터로 받아 이전 task로부터 T타입의 값을 받아 가공하여 U타입의 CompletionStage를 반환

- 반환한 CompletionStage가 done상태이면 값을 다음 task로 전달

- 다른 future를 반환해야 하는 경우 유용

 

4) thenRun(Async)

- Runnable을 파라미터로 받아 이전 task로부터 값을 받지도, 결과를 반환하지도 않는다.

- 다음 task에 null이 전달되며, future가 완료되었다는 이벤트를 기록할 때 사용한다.

 

5) exceptionally 

- Function을 파라미터로 받아 이전 task에서 발생한 exception을 받아서 처리한다.

- 다음 task에게 반환된 값을 전달한다.

- future 파이프에서 발생한 에러를 처리할 때 사용한다.

 

6) supplyAsync

- Supplier를 받아 CompletableFuture를 생성할 수 있으며 반환값이 된다.

 

7) runAsync

- Runnable을 받아 CompletableFuture를 생성할 수 있으며 값을 반환하지 않고 다음 task에 null을 전달한다.

 

8) complete 

- CompletableFuture가 완료되지 않았다면 주어진 값으로 채우고, complete에 의해 상태가 바뀌었으면 true, 아니면 false를 반환한다.

 

9) isCompletedExceptionally 

- Exception에 의해 Complete되었는지 확인 가능

 

10) allOf

- 여러 completableFuture을 모아서 하나의 completableFuture로 변환

- 모든 completableFuture가 완료되면 상태가 done으로 변경, void 이므로 각각의 값에 get으로 접근 필요함.

 

11) anyOf

- 여러 completableFuture를 모아서 하나의 completableFuture 변환

- 주어진 completableFuture중 하나라도 완료되면 상태가 done으로 변경, 제일 먼저 변경되는 future의 값 반환

 

CompletableFuture의 한계점

- 지연 로딩 기능을 제공하지 않음

- CompletableFuture를 반환하는 함수를 호출 시 즉시 작업이 실행됨

- 지속적으로 생성되는 데이터는 처리 어려움

- CompletableFuture에서 데이터를 반환하고 나면 다시 다른 값을 전달하기 어려움.

반응형