HystrixCommand를 사용하다 보니, 자연스럽게 #toObservable을 통해 Observable을 사용하게 됨. 하지만, 실제로 사용하는 방식을 살펴보면, CompletableFuture로 충분해 보였음. 그러면서 ObservableCompletableFuture의 차이가 뭔지 궁금해짐. 대략적으로만 알던 CompletableFuture를 좀 더 이해할 필요가 있었고, 이 때 Java 8: Definitive guide to CompletableFuture 문서가 도움이 됨. 필요한 부분만을 번역하여 기록해 둠. 다음 글에서는 Observable에 관련된 글 번역 혹은 요약 예정.

감싸진 값의 추출/수정

일반적으로 퓨처Future는 다른 스레드에서 실행되는 코드 조각을 가리킨다. 하지만 항상 그런 것은 아니다. 가끔은 JMS 메시지 도착과 같이, 곧 발생하리라 기대되는 이벤트를 나타내기도 한다. 즉, Future<Message>가 존재하더라도 실행 중인 비동기 작업은 없을 수 있다. 그리고 JMS 메시지가 도착하면 이 퓨처를 완료complete(resolve)시킨다. 이게 바로 이벤트 드리븐이다. 단지 CompletableFuture를 만들고, 클라이언트에게 전달해 준 뒤, 작업이 완료되었다고 생각되면 퓨처에 대해 complete()을 호출하기만 하면 된다. 퓨처의 완료를 기다리던 클라이언트는 이제 다음 작업을 계속 할 수 있다.

초보자를 위해, 간단한 CompletableFuture를 만들어 클라이언트에게 전달하는 코드를 살펴보자.

public CompletableFuture<String> ask() {
  final CompeltableFuture<String> future = new CompletableFuture<>();
  // ...
  return future;
}

이 퓨처에는 Callable<String>도 연관되어 있지 않고, 스레드 풀이나 비동기 작업도 없다. 만약 클라이언트 코드가 ask().get()을 호출하면, 클라이언트는 영원히 대기하게 된다. 완료 콜백을 등록해준다고 하더라도, 영원히 실행되지 않을 것이다. 그래서 어떻게 하란 말인가?

future.complete("42");

위 코드를 실행하면 대기중이던 모든 클라이언트의 Future.get() 코드가 결과 문자열을 받게 된다. 물론, 완료 콜백도 즉시 수행된다. 이는 앞으로 일어날 일(반드시 어떤 스레드에서 실행되야 하는 연산일 필요는 없다)을 표현할 때 유용하다. CompletableFuture.complete()은 오직 한 번만 호출될 수 있으며, 이어지는 호출들은 무시된다. 하지만 CompletableFuture.obtrudeVale(...)라는 뒷구멍이 존재한다. 이 함수는 이미 제공된 Future의 이전 값을 오버라이드 한다. 주의해서 사용해야 한다.

종종 실패를 감지하고 싶을 때가 있다. 잘 알다시피 Future 객체는 래핑된 결과나 예외를 다룰 수 있다. 예외를 전달하고 싶다면, CompletableFuture.completeExceptionally(ex)를 사용하면 된다. completeExceptionally가 호출되면 모든 클라이언트의 대기는 해제되고, get()에서 예외가 일어난다. 예외를 다루는 방식이 조금 다르긴 하지만 CompletableFuture.join() 메소드도 있다. 일반적으로 둘은 동일하게 취급된다. 그리고 마지막으로 CompletableFuture.getNow(valueIfAbsent) 메소드가 있다. 이 메소드는 클라이언트를 대기시키지 않는다. 그리고 만약, Future가 완료되지 않은 경우에 호출되면 기본 값을 반환한다. 너무 많이 기다리지 않아야 하는 시스템을 만들 때 유용하다.

마지막으로 completedFuture(value)라는 static 유틸리티 메소드가 있다. 이는 이미 완료된 Future 객체를 반환한다. 테스트 혹은 어댑터 레이어를 작성할 때 유용할 수 있다.

CompletableFuture의 생성과 획득

자 그런데, CompletableFuture는 직접 생성만 가능한가? 물론 아니다. 다른 퓨처들 처럼, 이미 존재하는 작업을 CompletableFuture로 감쌀 수 있다. 아래의 팩토리 메소드들로 말이다.

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

Executor를 인수로 취하지 않는 ...Async들은 ForkJoinPool.commonPool()(JDK 8에서 범용으로 도입된)을 사용한다. 이는 CompletableFuture 클래스에 있는 대부분의 메소드에도 해당된다. runAsync()는 간단하다. Runnable을 인수로 받고, 따라서 CompletableFuture<Void>를 반환한다. Runnable은 아무 것도 반환하지 않기 때문이다. 비동기 작업을 통해 결과를 받아야 하는 경우라면 Supplier<U>를 사용하라.

final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        //...long running...
        return "42";
    }
}, executor);

하지만 우리에겐 Java 8 람다가 있다!

final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  //...long running...
  return "42";
}, executor));

심지어는 아래와 같이 할 수 있다.

final CompletableFuture<String> future = 
    CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);

이 글은 프로젝트 람다에 관한 것이 아니다. 하지만 광범위하게 람다를 사용할 것이다.

CompletableFuture의 변환 (thenApply)

앞서 필자가 CompleteableFutureFuture보다 좋다고 했는데, 아직 이유는 설명하지 않았다. 간단히 말하면, CompleteableFuture는 모나드monad이고 펑터functor이기 때문이다. 아마도 이 이야기가 도움이 되지 않았으리라. 스칼라와 자바스크립트는 모두 퓨처가 완료될 때 실행될 콜백을 등록할 수 있다. 준비될 때까지 기다리지 않아도 된다. 대신 이렇게 말할 수 있다. “결과가 도착하면 그 결과와 함께 이 함수를 실행시켜줘.” 게다가, 이런 함수들을 스택에 쌓거나 결합하는 일 등이 가능하다. 예를 들면, StringInteger로 변환하는 함수가 있다면, CompletableFuture<String>CompletableFuture<Integer>로 쉽게 변환할 수 있다. 함수를 뜯어 고치지 않고서도 말이다. 이는 thenApply() 등의 메소드로 가능하다.

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

이전에 언급했듯이 ...Async 버전은 CompletableFuture의 대부분 작업에 제공된다. 따라서, 이어지는 섹션에서의 설명은 생략한다. 단지 이것만 기억하라. 퓨처가 완료되면 첫 번째 메소드는 동일한 스레드 내에서 함수를 적용시키고, 나머지 2개 메소드는 별도의 스레드 풀에서 비동기로 적용한다.

이제 thenApply가 어떻게 동작하는지 살펴보자.

CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);

혹은 아래처럼 한 문장으로도 가능하다.

CompletableFuture<Double> f3 = 
    f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);

String에서 Integer로, 그리고 난 뒤 Double로 변환되는 순서가 잘 드러난다. 하지만 가장 중요한 것은, 이런 변환들이 즉각적으로 실행되거나 블럭킹되지 않는다는 점이다. 이들은 단지 기억될 뿐이고 f1이 완료될 때 실행된다. 일부 변환 작업의 소요 시간이 길다면, 별도의 Executor를 제공해서 비동기로 실행시킬 수도 있다. 이들 작업은 스칼라의 모나딕 map과 동등함에 주목하라.

완료 시 코드 실행 (thenAccept/thenRun)

CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Runnable action);

이 두 메소드는 퓨처 파이프라인에서의 전형적인 “마지막” 단계이다. 이들은 퓨처 결과가 구해지면, 소비할 수 있게 도와준다. thenAccept()는 그 마지막 값을 제공하지만, thenRunRunnable을 실행, 즉 연산 결과에 접근이 불가하다.

future.thenAcceptAsync(db1 -> log.debug("Result: {}", db1), executor);
log.debug("Continuing");

…Async의 변형들은 두 메소드에 대해서도 제공된다. 또한, executor를 인자로 받는 것도 있고 받지 않는 것도 있다. 여기서 중요한 것은 thenAccept()/thenRun() 메소드가 블럭킹을 일으키지 않는다는 점이다(명시적으로 executor가 없다고 하더라도 말이다). 이들을 마치 퓨처에 할당된 이벤트 리스너/핸들러처럼 여기길 바란다. 그리고 곧 실행될 것이다. 물론, “Continuing” 메시지가 먼저, 그리고 즉각적으로 나타난다. future가 아무리 빨리 완료된다고 하더라도 말이다.

단일 CompletableFuture의 에러 핸들링

지금까지 우리는 연산 결과에 대해서만 살펴보았다. 예외는 어떻게 해야 할까? 이들 역시 비동기로 다룰 수 있을까? 물론이다!

CompletableFuture<String> safe =
  future.exeptionally(ex -> "We have a problem: " + ex.getMessage());

exceptionally()에서 인자로 취한 함수는 future가 예외를 던질 때 실행된다. 여기서 이 예외를 다시 Future의 타입과 호환되는 값으로 변환할 수 있다. safe의 이어지는 변환에는 예외가 전파되는 대신 함수가 반환하는 문자열을 받게 된다.

좀 더 유연한 접근법은 handle()을 사용하는 것이다. 이는 정상적인 결과와 예외 모두를 인자로 받는다.

CompletableFuture<Integer> safe = future.handle((ok, ex) -> {
  if (ok != null) {
    return Integer.parseInt(ok);
  } else {
    log.warn("Problem", ex);
    return -1;
  }
});

handle()은 항상 호출되며, 정상적인 결과나 예외 인자 둘 중 하나는 널값이 아니다. 결국, 한 번에 캐치하는 전략이다.

두 개의 CompletableFuture 조합

한 개의 CompletableFuture를 비동기로 처리하는 것이 멋지긴 하지만, 다수의 퓨처가 다양한 방식으로 조합될 때 더 큰 힘을 발휘한다.

두 개의 퓨처 조합/체이닝 (thenCompose())

퓨처의 값에 대해 실행되지만, 반환되는 값이 퓨처인 경우가 있다. CompletableFuture는 충분히 똑똑해서, 함수의 결과를 CompletableFuture<CompletableFuture<T>>로 반환하는 대신, 최상위 레벨의 퓨처를 반환해 줄 수 있다. 그런 측면에서 thenCompose() 메소드는 스칼라에서의 flatMap과 동등하다.

<U> CompletableFuture<U> thenCompose(Function<? super T, CompletableFuture<U>> fn);

아래의 예제에서는 calculateRelevance() 함수가 적용되는 부분에서, thenApply()(map)thenCompose()(flatMap)의 차이, 그리고 타입들을 유심히 살펴보라. 물론 thenCompose에도 …Async 변형이 존재한다.

CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f =
  docFuture.thenApply(this::claculateRelevance);
CompletableFuture<Double> relevanceFuture =
  docFuture.thenCompose(this::calculateRelevance);

private CompletableFuture<Double> calculateRelevance(Document doc) // ...

thenCompose()는 튼튼한 비동기 파이프라인을 만들 때 필수적인 메소드이다. 중간 단계들을 기다리거나 블럭킹 시키지 않으면서 말이다.

두 개의 퓨처 변환 (thenCombine())

thenCompose가 하나의 퓨처를 다른 의존 퓨처에 체이닝하는데 사용되는 한편, thenCombine은 두 개의 독립적인 퓨처를 결합(두 개 모두 완료되면)해준다.

<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U other, BiFunction<? super T,? super U,? extends V> fn);

두 개의 CompletableFuture가 있다고 상상해보자. 하나는 Customer를 불러오고 다른 하나는 가까운 Shop 정보를 불러온다. 이들은 서로 독립적으로 완료되지만, 둘 모두가 완료되면, 이들의 값을 Route 계산하는데 활용하고 싶다. 여기 그 예제가 있다.

CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture = 
  customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));

//...
 
private Route findRoute(Customer customer, Shop shop) //...

Java 8에서는 (cust, shop) -> findRoute(cust, shop)this::findRoute 메소드 레퍼런스로 단순화할 수 있다.

customerFuture.thenCombine(shopFuture, this::findRoute);

두 개의 CompletableFuture가 모두 완료되기를 기다리기

새로운 CompletableFuture를 만들어 두 개의 결과를 조합하는 대신, 작업이 완료되면 통지받는 간단한 방법도 있다. 이 경우 thenAcceptBoth()/runAfterBot() 등의 메소드를 사용한다. thenAccept()thenRun()과 유사하지만 한 개 대신 두 개의 퓨처를 기다린다.

<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block);
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action);

앞선 예제에서 처럼, 새로운 CompletableFuture<Route>를 만드는 대신, 즉각적으로 이벤트 전송이나 GUI 리프레시 등을 하고 싶을 수 있다. thenAcceptBoth를 이용하면 쉽다.

customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
  final Route route = findRoute(cust, shop);
  //refresh GUI with route
});

이렇게 묻는 사람들이 있을지도 모르겠다. “두 퓨처를 간단히 블럭시키면 되지 않을까?” 아래처럼 말이다.

Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());

물론 가능한 일이다. 하지만 CompletableFuture를 사용하는 것은 비동기적으로, 이벤트 주도 프로그래밍 모델을 사용하기 위함이다. 블럭킹되거나 성급하게 결과를 기다리지 않고 말이다. 따라서, 위 2개의 코드 블럭은 동등하긴 하나, 후자의 것은 불필요하게 스레드의 실행을 점유해 버린다.

먼저 완료되는 CompletableFuture 기다리기

CompletableFuture는 다수의 퓨처 중 가장 먼저 완료된 것의 결과를 받을 수도 있다. 결과 타입이 동일한 두 개의 작업이 있고, 어떤 작업이 먼저 끝나는지 보다 응답 시간이 중요할 때, 이 기능이 유용할 수 있다.

CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block);
CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action);

두 개의 시스템을 통합해야 한다고 가정해보자. 하나는 빠른 평균 응답 시간을 가지지만 들쭉 날쭉하고, 다른 하나는 느리지만 일관된 응답 시간을 가진다. 이 때 취할 수 있는 한 가지 방법이 있다. 둘을 동시에 호출한 뒤 하나가 완료되길 기다리는 것이다. 일반적으로는 첫 번째가 그 대상이 되고, 다소 느려지더라도 두 번째 작업에 의해 예측 가능한 수준에서 완료된다.

CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
  System.out.println("Result: " + s);
});

여기서 sfetchFast() 혹은 fetchPredictably()의 결과 문자열이다. 그게 누군지는 알 수도 없고 신경쓰지도 않는다.

먼저 완료된 결과 변환

applyToEither()acceptEither()의 형이다. 후자는 퓨처의 결과를 소비하는 반면, applyToEither()는 새로운 퓨처를 반환한다. 그리고 마찬가지로, 두 개의 퓨처 중 하나라도 완료되면 함께 완료된다.

<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn);

fn 함수는 먼저 완료된 퓨처의 결과에 대해 실행된다. 필자 개인적으로는 이 특수화된 메소드의 목적을 모르겠다. 다음과 같이 쉽게 사용할 수 있는 방법이 있는데도 말이다.

fast.applyToEither(predictable).thenApply(fn);

API에 명시되어 있긴 하지만, 실제로 필요로 하지는 않으므로, 단순히 Function.identity()라는 표시자를 사용했다.

CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone =
  fast.applyToEither(predictable, Function.<String>identity());

firstDone 퓨처는 이제 다른 사용자에게 전달되고, 사용자는 두 개의 퓨처가 숨겨져 있다는 사실을 모른다. 단지 퓨처가 완료되길 기다릴 뿐이다. applyToEither()만이 먼저 끝나는 퓨처로부터 결과를 전달 받는다.

다수의 CompletableFuture 조합

두 개의 퓨처가 완료되길 기다리는 법(thenCombine()), 그리고 둘 중에 먼저 완료되는 퓨처만을 기다리는 법(applyToEither())도 알았다. 하지만 더 많은 퓨처에 대해서는 어떻게 해야 할까? 이 때 아래의 static 헬퍼 메소드를 사용할 수 있다.

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

allOf()는 퓨처를 배열로 받고 한 개의 퓨처를 반환한다. 그리고 배열로 받은 모든 퓨처가 완료될 때, 반환된 퓨처도 완료된다. 한편, anyOf()는 배열로 받은 퓨처 중 하나라도 먼저 완료되면, 반환된 퓨처를 완료시킨다. 반환된 퓨처의 제너릭 타입에 주목하라. 아마도 당신이 기대한 것은 아니었을 것이다. 다음 글에서 이 이슈에 대해 다룰 것이다.

번역을 마치며

원문서가 워낙 설명을 잘해줘서 종종 잊곤 했지만, CompletableFuture가 그리 직관적인 API를 제공하고 있다고 생각되지 않는다. 다른 비동기 API들이 상대적으로 익히고 사용하기에 쉬웠기 때문이다. 물론, 역량 부족일 수도 있고, 제대로 API를 살펴보지 않아서 그럴 수도 있다. 그 외에도 얼마든지 이유가 있겠지만, 어쨌든 당분간은 약간의 의구심과 함께 익히고 사용하려 한다. 자, 여기까지가 끝이다. 참으로 어색한 급 마무리지만, 공부가 목적인 번역이었으니, 이대로 그냥 끝맺음을 :)