Backend/Java

Java - CompletableFuture

elevne 2023. 6. 15. 15:32

비동기적인, 멀티스레드를 활용한 프로그램을 작성할 때 callback 함수들을 많이 작성하게 되고 그러한 코드들은 작성하기 어려워진다. 만약 그 콜백 내에서 예외처리 등을 해주어야한다면 더욱 작성하기 어려워진다. 기존의 Future 인터페이스는 Java 5 에서 asynchronous computation 을 위해 추가되었지만, 여러 개의 작업을 합쳐서 작성하는데에는 많은 불편함이 있었다. Java 8 에서 그러한 문제를 해결하기 위해 CompletableFuture 클래스를 만들었다. 이는 Future 과는 다르게 다른 Asynchronous computation 끼리 조합이 가능하다. 

 

 

 

public class CompletableFutureExample {

    static ExecutorService executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors()
    );

    private static String process() {
        try {
            Thread.sleep(1000);
            return "PROCESS";
        } catch (InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(CompletableFutureExample::process, executorService);
        String value = cf.get();
        System.out.println(value);
    }

}

 

 

result

 

 

위와 같이 생성한 스레드풀 내에서 CompletableFuture 을 생성할 수도 있지만, 스레드풀을 지정하지 않고 생성할 수도 있다. 따로 지정해주지 않으면 기본 스레드풀로 ForkJoinPool 을 사용하게 된다. ForkJoinPool 은 Task 의 크기에 따라 분할(Fork)하고, 분할된 Task 가 처리되면 그것을 합쳐(Join) 리턴해준다. ForkJoinPool 안에 있는 모든 스레드들은 계속 자신이 나누어 할 일이 있는지 찾게되는데, 이를 work-stealing 이라고 표현한다. (이 외에도 지금까지 사용해보지 않은 ScheduledThreadPoolExecutor 이라는 것이 있다. 이는 어떤 작업을 일정 시간 지연 후 혹은 일정 시간 간격으로 주기적으로 실행해야 할 때 사용된다. 이에 대해서도 다시 알아볼 예정이다.)

 

 

private static void noReturnCF() throws InterruptedException, ExecutionException {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        System.out.println("NO RETURN COMPLETABLE FUTURE (RUN ASYNC): " + Thread.currentThread().getName());
    }, executorService);
    future.get();

    CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
        System.out.println("NO RETURN COMPLETABLE FUTURE : (RUN ASYNC): " + Thread.currentThread().getName());
    });
    future2.get();
}

 

 

result

 

 

runAsync 메소드는 리턴 값이 없을 때 사용한다. 반대로 리턴 값이 있을 때는 supplyAsync 메소드가 사용된다.

 

 

private static void yesReturnCF() throws InterruptedException, ExecutionException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        return Thread.currentThread().getName();
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        return Thread.currentThread().getName();
    }, executorService);
    String v1 = future.get();
    String v2 = future2.get();
    System.out.println(v1);
    System.out.println(v2);
}

 

 

result

 

 

 

thenApply, thenAccept, thenRun 메소드로 콜백을 적용할 수 있다.

 

 

private static void thenApplyTest() throws Exception {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        return 100;
    }).thenApply(i -> i + 1);
    int x = future.get();
    System.out.println(x);
}

 

 

result

 

 

thenApply 앞선 리턴 값을 받아서 다른 값을 리턴하는 메소드이다. 함수형 인터페이스 Function 을 파라미터로 받는다.

 

 

private static void thenAcceptTest() throws Exception {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                return "TEST thenAccept";
            }
    ).thenAccept(s -> System.out.println(s));
    future.get();
}

 

 

result

 

 

thenAccept리턴 값을 받아 처리하지만, 리턴하는 값은 따로 없다. 함수형 인터페이스 Consumer 을 파라미터로 받는다.

 

 

private static void thenRunTest() throws Exception {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() ->
    {
        try {
            Thread.sleep(1000);
            System.out.println("FIRST");
        } catch (InterruptedException e) {
        }
    }).thenRun(() -> System.out.println("SECOND")
    );
    future.get();
}

 

 

result

 

 

thenRun리턴 값을 받지 않고 다른 작업을 이어서 실행한다. 함수형 인터페이스 Runnable 을 파라미터로 받는다.

 

 

CompletableFuture 작업 간 조합을 진행하고자 할 때는 thenCompose, thenCombine, allOf, anyOf 메소드가 사용될 수 있다.

 

 

private static void thenComposeTest() throws Exception {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        return "FUTURE1";
    });
    future.thenCompose(CompletableFutureExample2::thenComposeTest2);
}

private static CompletableFuture<Void> thenComposeTest2(String s) {
    return CompletableFuture.runAsync(() -> {
        System.out.println(s.length());
    });
}

 

 

result

 

 

thenCompose두 작업이 이어서 실행되도록 조합해주는 메소드이다. 앞선 작업의 결과를 받아서 사용할 수 있으며, 함수형 인터페이스 Function 을 파라미터로 받아 사용한다.

 

 

private static void thenCombineTest() throws Exception {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        System.out.println("FUTURE1");
        return 3;
    });
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("FUTURE2");
        return 5;
    });
    CompletableFuture<Integer> future = future1.thenCombine(future2, (f, s) -> f + s);
    System.out.println(future.get());
}

 

 

result

 

 

thenCombine두 작업을 독립적으로 실행하고, 둘 다 완료되었을 때 콜백을 실행한다. 이 또한 함수형 인터페이스 Function 을 파라미터로 받아 사용한다.

 

 

private static void allOfTest() throws Exception {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        return "FUTURE1";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        return "FUTURE2";
    });
    List<CompletableFuture<String>> futures = List.of(future1, future2);
    CompletableFuture<List<String>> result = CompletableFuture.allOf(
                    futures.toArray(new CompletableFuture[futures.size()]))
            .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    result.get().forEach(System.out::println);
}

 

 

result

 

 

allOf여러 작업을 동시에 실행하고, 모든 작업 결과에 콜백을 실행한다. 위에서 사용된 CompletableFuturejoin 메소드는 CompletableFuture 이 완료될 때까지 현재 스레드를 블록하고, 완료되면 해당 결과를 반환하는 메소드이다. Future 의 get 메소드와 유사하지만, 해당 메소드는 체크드 에외인 InterruptedException 을 던지지 않는다.

 

 

public static void anyOfTest() throws Exception {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        return "FUTURE 1";
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        return "FUTURE 2";
    });

    CompletableFuture<Void> future = CompletableFuture.anyOf(future1, future2).thenAccept(System.out::println);
    future.get();
}

 

 

result

 

 

마지막으로 anyOf 는 여러 작업들 중에서 가장 빨리 끝난 하나의 결과에 대해서만 콜백을 실행한다.

 

 

 

 

 

 

 

 

Reference:

https://mangkyu.tistory.com/263