기록/JAVA

[JAVA8] CompletableFuture

jeongdalma 2021. 1. 17. 14:10

자바에서 비동기(Asynchronous) 프로그래밍을 가능케하는 인터페이스

  • Future를 사용해서 어느정도 가능했지만 하기 힘든일 들이 많았다.
    • Future를 외부에서 완료 시킬 수 없다. 취소하거나 , get()에 타임아웃을 설정할 수는 있다.
    • 블록킹 코드 get()을 사용하지 않고서는 작업이 끝났을 때 콜백을 실행할 수 없다.
    • 여러 Future를 조합할 수 없다.
    • 예) Event 정보 가져온 다음 Event에 참석하는 회원 목록 가져오기
    • 예외 처리용 API를 제공하지 않는다.

 

CompletableFuture - CompletableFuture (Java Platform SE 8 ) (oracle.com)

 

비동기로 작업 실행하기

  • runAsync() - 리턴 값이 없는 경우
  • supplyAsync() - 리턴 값이 있는 경우
  • 원하는 Executor(Thread Pool)를 사용해서 실행할 수도 있다.
  • 기본은 ForkJoinPool.commonPool()
    • ForkJoinPool - JAVA7

 

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        completableFuture.complete("test");
        // 위와 동일한 코드이다.
        CompletableFuture<String> completableFuture1 = CompletableFuture.completedFuture("test");

        // runAsync - 리턴 값이 없는 경우
        CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
        });
        completableFuture2.get();
        // 출력
        // Hello ForkJoinPool.commonPool-worker-3

        // supplyAsync() - 리턴 값이 있는 경우
        CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> {
            return "return Value!!!";
        });
        System.out.println(completableFuture3.get());
        // 출력
        // return Value!!!
    }

 

콜백 제공하기

  • thenApply(Function) - 리턴 값을 받아서 다른 값으로 바꾸는 콜백
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Return : " + Thread.currentThread().getName());
            return "return Value!!!";
        }).thenApply((s) -> {
            System.out.println("Then Apply : " + Thread.currentThread().getName());
            return s.toUpperCase();
        });

        // get을 호출하지 않으면 아무일도 일어나지 않는다.
        String s = completableFuture.get();
        System.out.println(s);
        // 출력
        // Return : ForkJoinPool.commonPool-worker-3
        // Then Apply : ForkJoinPool.commonPool-worker-3
        // RETURN VALUE!!!
    }

 

  • thenAccept(Consumer) - 리턴 값으로 또 다른 작업을 처리하는 콜백 ( 리턴없이 )
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Return : " + Thread.currentThread().getName());
            return "return Value!!!";
        }).thenAccept((s) -> {
            System.out.println("Then Accept : " + Thread.currentThread().getName());
            System.out.println("Then Accept To UpperCase : " + s.toUpperCase());
        });

        completableFuture.get();
        // 출력
        // Return : ForkJoinPool.commonPool-worker-3
        // Then Accept : ForkJoinPool.commonPool-worker-3
        // Then Accept To UpperCase : RETURN VALUE!!!
    }

 

  • thenRun(Runnable) - 다른 작업을 처리하는 콜백
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Return : " + Thread.currentThread().getName());
            return "return Value!!!";
        }).thenRun(() -> {
            // Runnable
            System.out.println("Then Run : " + Thread.currentThread().getName());
        });

        completableFuture.get();
        // 출력
        // Return : ForkJoinPool.commonPReturn : ForkJoinPool.commonPool-worker-3
        // Then Run : ForkJoinPool.commonPool-worker-3
    }

 

  • 콜백 자체를 또 다른 Thread에서 실행할 수 있다.
    • ForkJoinPool을 사용하지 않고 개발자가 직접 만든 Thread를 제공할 수도 있다.
    • ExecutorService를 새로 생성하여 매개변수로 전달하면 된다.
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Return : " + Thread.currentThread().getName());
            return "return Value!!!";
        } , executorService).thenRun(() -> {
            // Runnable
            System.out.println("Then Run : " + Thread.currentThread().getName());
        });

        completableFuture.get();
        executorService.shutdown();
//        출력
//        Return : pool-1-thread-1
//        Then Run : pool-1-thread-1
    }

 

조합하기

  • thenCompose() - 두 작업이 서로 이어서 실행하도록 조합
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello ";
        });

        // hello.thenCompose(s -> getWorld(s));
        CompletableFuture<String> helloWorld = 
                hello.thenCompose(AppForCompletableFuture::getWorld);
        System.out.println(helloWorld.get());

        // 출력
        // Hello : ForkJoinPool.commonPool-worker-3
        // World : ForkJoinPool.commonPool-worker-5
        // Hello  World
    }
    private static CompletableFuture<String> getWorld(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return message + " World";
        });
    }

 

  • thenCombine() - 두 작업을 독립적으로 실행하고 둘 다 종료 했을 때 콜백 실행
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello ";
        });

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture<String> result = hello.thenCombine(world, (h , w) -> h + " " + w);
        System.out.println(result.get());
        // 출력
        // Hello : ForkJoinPool.commonPool-worker-3
        // World : ForkJoinPool.commonPool-worker-5
        // Hello  World
    }

 

  • allOf() - 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행

allOf를 사용하여 작업의 결과를 List로 반환받기

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello ";
        });

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return "World";
        });

        List<CompletableFuture> futures = Arrays.asList(hello , world);

        CompletableFuture[] futuresArray
                = futures.toArray(new CompletableFuture[futures.size()]);

        // 결과 타입들이 모두 동일해야한다.
        CompletableFuture<List<Object>> listCompletableFuture = 
                CompletableFuture.allOf(futuresArray).thenApply(v -> {
                    return futures.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList());
                });

        listCompletableFuture.get().forEach(System.out::println);

        // 출력
        // Hello : ForkJoinPool.commonPool-worker-3
        // World : ForkJoinPool.commonPool-worker-5
        // Hello
        // World
    }

 

  • anyOf() - 여러 작업 중에 가장 빨리 끝난 하나의 결과에 콜백 실행 
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello ";
        });

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture<Void> future = 
                CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
        future.get();

        // 출력
        // Hello : ForkJoinPool.commonPool-worker-3
        // World : ForkJoinPool.commonPool-worker-5
        // Hello
    }

 

예외처리

  • exceptionally(Function)
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        boolean throwError = true;

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if(throwError){
                throw new IllegalArgumentException();
            }
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello ";
        }).exceptionally(exceptionType -> {
            System.out.println("Exception Type : " + exceptionType);
            return "Error!";
        });

        System.out.println(hello.get());
        
        // 출력
        // Exception Type : java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
        // Error!
    }

 

  • handle(BiFunction)
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        boolean throwError = true;

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if(throwError){
                throw new IllegalArgumentException();
            }
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello ";
        }).handle((result , exceptionType) -> {
            // 첫번째 파라미터 - 정상적인 경우 반환되는 결과 값
            // 두번째 파라미터 - 예외 발생시 예외
            if(exceptionType != null){
                System.out.println("Exception Type : " + exceptionType);
                return "ERROR !!!";
            }
            return result;
        });

        System.out.println(hello.get());
        // 예외 발생 시 "ERROR !!!" 를 반환
        // 에외가 발생하지 않았을 시 "Hello" 반환
    }

 

 

 

참고

ForkJoinPool (Java Platform SE 8 ) (oracle.com)

CompletableFuture (Java Platform SE 8 ) (oracle.com)

 

 

 

더 자바, Java 8 - 인프런

자바 8에 추가된 기능들은 자바가 제공하는 API는 물론이고 스프링 같은 제 3의 라이브러리 및 프레임워크에서도 널리 사용되고 있습니다. 이 시대의 자바 개발자라면 반드시 알아야 합니다. 이

www.inflearn.com

 

댓글수0