Java - CompletableFuture
비동기적인, 멀티스레드를 활용한 프로그램을 작성할 때 callback 함수들을 많이 작성하게 되고 그러한 코드들은 작성하기 어려워진다. 만약 그 콜백 내에서 예외처리 등을 해주어야한다면 더욱 작성하기 어려워진다. 기존의 Future 인터페이스는 Java 5 에서 asynchronous computation 을 위해 추가되었지만, 여러 개의 작업을 합쳐서 작성하는데에는 많은 불편함이 있었다. Java 8 에서 그러한 문제를 해결하기 위해 CompletableFuture 클래스를 만들었다. 이는 Future 과는 다르게 다른 Asynchronous computation 끼리 조합이 가능하다.
public class CompletableFutureExample {
static ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
private static String process() {
try {
Thread.sleep(1000);
return "PROCESS";
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}
public static void main(String[] args) throws Exception {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(CompletableFutureExample::process, executorService);
String value = cf.get();
System.out.println(value);
}
}
위와 같이 생성한 스레드풀 내에서 CompletableFuture 을 생성할 수도 있지만, 스레드풀을 지정하지 않고 생성할 수도 있다. 따로 지정해주지 않으면 기본 스레드풀로 ForkJoinPool 을 사용하게 된다. ForkJoinPool 은 Task 의 크기에 따라 분할(Fork)하고, 분할된 Task 가 처리되면 그것을 합쳐(Join) 리턴해준다. ForkJoinPool 안에 있는 모든 스레드들은 계속 자신이 나누어 할 일이 있는지 찾게되는데, 이를 work-stealing 이라고 표현한다. (이 외에도 지금까지 사용해보지 않은 ScheduledThreadPoolExecutor 이라는 것이 있다. 이는 어떤 작업을 일정 시간 지연 후 혹은 일정 시간 간격으로 주기적으로 실행해야 할 때 사용된다. 이에 대해서도 다시 알아볼 예정이다.)
private static void noReturnCF() throws InterruptedException, ExecutionException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("NO RETURN COMPLETABLE FUTURE (RUN ASYNC): " + Thread.currentThread().getName());
}, executorService);
future.get();
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("NO RETURN COMPLETABLE FUTURE : (RUN ASYNC): " + Thread.currentThread().getName());
});
future2.get();
}
runAsync 메소드는 리턴 값이 없을 때 사용한다. 반대로 리턴 값이 있을 때는 supplyAsync 메소드가 사용된다.
private static void yesReturnCF() throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return Thread.currentThread().getName();
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return Thread.currentThread().getName();
}, executorService);
String v1 = future.get();
String v2 = future2.get();
System.out.println(v1);
System.out.println(v2);
}
thenApply, thenAccept, thenRun 메소드로 콜백을 적용할 수 있다.
private static void thenApplyTest() throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 100;
}).thenApply(i -> i + 1);
int x = future.get();
System.out.println(x);
}
thenApply 는 앞선 리턴 값을 받아서 다른 값을 리턴하는 메소드이다. 함수형 인터페이스 Function 을 파라미터로 받는다.
private static void thenAcceptTest() throws Exception {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "TEST thenAccept";
}
).thenAccept(s -> System.out.println(s));
future.get();
}
thenAccept 는 리턴 값을 받아 처리하지만, 리턴하는 값은 따로 없다. 함수형 인터페이스 Consumer 을 파라미터로 받는다.
private static void thenRunTest() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() ->
{
try {
Thread.sleep(1000);
System.out.println("FIRST");
} catch (InterruptedException e) {
}
}).thenRun(() -> System.out.println("SECOND")
);
future.get();
}
thenRun 은 리턴 값을 받지 않고 다른 작업을 이어서 실행한다. 함수형 인터페이스 Runnable 을 파라미터로 받는다.
CompletableFuture 작업 간 조합을 진행하고자 할 때는 thenCompose, thenCombine, allOf, anyOf 메소드가 사용될 수 있다.
private static void thenComposeTest() throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "FUTURE1";
});
future.thenCompose(CompletableFutureExample2::thenComposeTest2);
}
private static CompletableFuture<Void> thenComposeTest2(String s) {
return CompletableFuture.runAsync(() -> {
System.out.println(s.length());
});
}
thenCompose 는 두 작업이 이어서 실행되도록 조합해주는 메소드이다. 앞선 작업의 결과를 받아서 사용할 수 있으며, 함수형 인터페이스 Function 을 파라미터로 받아 사용한다.
private static void thenCombineTest() throws Exception {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("FUTURE1");
return 3;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("FUTURE2");
return 5;
});
CompletableFuture<Integer> future = future1.thenCombine(future2, (f, s) -> f + s);
System.out.println(future.get());
}
thenCombine 은 두 작업을 독립적으로 실행하고, 둘 다 완료되었을 때 콜백을 실행한다. 이 또한 함수형 인터페이스 Function 을 파라미터로 받아 사용한다.
private static void allOfTest() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "FUTURE1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "FUTURE2";
});
List<CompletableFuture<String>> futures = List.of(future1, future2);
CompletableFuture<List<String>> result = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
result.get().forEach(System.out::println);
}
allOf 는 여러 작업을 동시에 실행하고, 모든 작업 결과에 콜백을 실행한다. 위에서 사용된 CompletableFuture 의 join 메소드는 CompletableFuture 이 완료될 때까지 현재 스레드를 블록하고, 완료되면 해당 결과를 반환하는 메소드이다. Future 의 get 메소드와 유사하지만, 해당 메소드는 체크드 에외인 InterruptedException 을 던지지 않는다.
public static void anyOfTest() throws Exception {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return "FUTURE 1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "FUTURE 2";
});
CompletableFuture<Void> future = CompletableFuture.anyOf(future1, future2).thenAccept(System.out::println);
future.get();
}
마지막으로 anyOf 는 여러 작업들 중에서 가장 빨리 끝난 하나의 결과에 대해서만 콜백을 실행한다.
Reference: