seraphyの日記

日記というよりは過去を振り返るときのための単なる備忘録

CompletableFutureの使い方の基本形とFutureTaskとの連携例

Java8から利用可能になったCompluteableFutureとは、他言語におけるDeferredとかPromiseパターンといわれるものと同等のものであり、これにより並列処理を直列的に記述できるようになる。(つまり超便利。)

基本的な使い方

もっとも簡単な使い方としては、CompletableFutureが用意している静的コンビニエンスメソッドであるrunAsyncか、supplyAsyncを使う。


たとえば、以下のようなものになる。

1つのタスクの完了後に処理を継続する例
  ExecutorService es = Executors.newCachedThreadPool();
...
  CompletableFuture<String> cf =
      CompletableFuture.supplyAsync(() -> heavyWork("job1"), es);

  cf.whenComplete((ret, ex) -> {
    if (ex == null) {
      // 成功した場合
      System.out.println("result=" + ret);
    } else {
      // 失敗した場合
      System.err.println("err=" + ex);
    }
  });

このように、CompletableFutureを用いると、「完了したら何かを行う」という処理を容易に記述できる。

2つ以上のタスクを同時並列に実行して、すべての待ち合わせる場合
  List<CompletableFuture<String>> cfs = Arrays.asList(
      CompletableFuture.supplyAsync(() -> heavyWork("job1"), es),
      CompletableFuture.supplyAsync(() -> heavyWork("job2"), es),
      CompletableFuture.supplyAsync(() -> heavyWork("job3"), es));

  CompletableFuture<Void> cf = CompletableFuture.allOf(
      cfs.toArray(new CompletableFuture[cfs.size()]));

  cf.whenComplete((ret, ex) -> {
    if (ex == null) {
      // すべて成功した場合
      String msg = cfs.stream().map(future -> {
        try {
          return future.get();
        } catch (Exception iex) {
          return iex.toString();
        }
      }).collect(Collectors.joining(","));
      System.out.println("result=" + msg);

    } else {
      // いずれかが失敗した場合(いずれか1つの例外のみ取得される)
      System.err.println("err=" + ex);
    }
  });

CompletableFuture#allOf(...)メソッドは、指定された全てのCompletableFutureの完了によって完了するVoid型のCompletableFutureを返す。(つまり、すべてのCompletableFutureが実行し終えることで完了となる。)


返されるCompletableFutureは結果がVoid型なので値を得ることはできないため、個別のCompletableFutureで確認する必要がある。

しかし、いずれかのCompletableFutureが失敗している場合は、少なくとも、その例外の1つは把握できる。(複数例外が発生しているかどうかは、個別に確認する必要がある。)

2つ以上のタスクを非同期、且つ"直列"に実行する場合
private ExecutorService es = Executors.newCachedThreadPool();
...

@FXML
public void onClickLoadButton() {
  CompletableFuture<String> work1 =
      CompletableFuture.supplyAsync(() -> heavyWork("job1"), es);
  CompletableFuture<String> work2 =
      work1.thenCompose(work1Result ->
    CompletableFuture.supplyAsync(() ->
         heavyWork("job2(" + work1Result + ")"), es));

  work2.thenAcceptAsync(result -> {
    // work1, work2が完了した場合
    // ※ UIスレッドで何らかの更新処理を行う
    System.out.println(result);
  }, Platform::runLater);
}

典型的なシチュエーションとしては、たとえば、JavaFXなどのGUIでボタン押下により、job1を非同期で開始し、job1で得られた結果をもとにjob2を起動して、それが完了したら何らかのUI更新を行う、というようなことが比較的容易に記述できる。

(上記例ならば、ボタン押下時はCompletableFutureを起動したら直ちにメソッドを抜けているのでUIスレッドがブロックされることはない。)


※ なお、2段目の結果を受け取るthenAcceptAsyncのExecutorにPlatform::runLaterを指定しているので、この部分はJavaFXのUIスレッドで実行されることになる。

メソッド名の末尾がAsyncになっているメソッドの意味

runAsync, whenCompleteAsyncのようにメソッドの末尾にasyncとあるものは、引数に指定した任意のExecutorで実行することを示している。

AsyncメソッドでExecutorを省略した場合は、ForkJoinPool.commonPool*1が使われる。*2 *3


Asyncがつかないものは、前段の完了後に前段と同じスレッドで続けて呼び出される。

CompletableFutureはイベントリスナではない。

runAsyncでCompletableFutureを受け取った時点で、すでに処理が開始されているので、
もし、whenCompleteAsyncを実行するまえに処理が完了していたらどうなるかと心配する向きもあるかもしれない。

しかし、処理が完了したあとにwhenComplete...を呼び出しても、きちんと処理が継続されるようになっている。

たとえば、コンビニエンスメソッドのcompletedFutureでは、すでに値が確定して完了済みのCompletableFutureを返すので、完了している状態に対して、あとからwhenCompleteなどを呼び出すしかないわけだが、もちろん、ちゃんと機能する。


CompletableFutureは、あくまでも前段の確定した状態に対して後続の処理を接続させる仕組みであり、イベントリスナのようなものではない、ということである。

"未完了"なCompletaleFutureを自前で作成してレガシーなスレッドを"完了可能"にする方法

CompletableFutureが標準で用意しているコンビニエンスメソッドはRunnableかSupplierを受け取るメソッドしかないが、たとえば、Callableを受け取ってCompletableFutureを返すにはどうしたら良いか?


これは比較的簡単に実装できる。

CompletableFutureは、明示的にCompletableFutureのインスタンスを構築することで、自前で完了を通知する仕組みを作ることができるようになっている。

作り方

多くの他のプラットフォームのPromise/Deferredのような仕組みと同様に、

  1. まずコンストラクタで未完了状態のCompletableFutureオブジェクトを作成する。
  2. 処理が完了したらcompleteを呼び出して結果を返して、完了状態にする。
  3. もし処理が失敗により終了したら、completeExceptionallyを呼び出して例外を返して、完了状態にする。

基本、これだけでOK.


たとえば、直接Threadを作成してstartしているようなレガシーな処理に、待ち合わせの仕組みを追加したい場合には、
以下のように未完了なCompletableFutrueを構築し、処理終了したら完了状態への遷移を明示的に行えば良い。

  CompletableFuture<String> cf = new CompletableFuture<>(); // 未完了状態で構築
  new Thread() {
    @Override
    public void run() {
      try {
        IntStream.range(0, 10)
          .peek(System.out::println)
          .forEach(idx -> heavyWork(idx));

        // 正常による完了状態への遷移
        cf.complete("done!");

      } catch (Throwable ex) {
        // 例外による完了状態への遷移
        cf.completeExceptionally(ex);
      }
    }
  }.start();

  cf.whenComplete((ret, ex) -> {
    // 完了時に呼び出される
    System.out.println("finished. ret=" + ret + ", ex=" + ex);
  });


このように、completeまたはcompleteExceptionallyによって正常または例外による完了状態への遷移を行うことができる。

また、複数スレッドでの操作の場合、状態遷移は「先勝ち」であり、また、一度確定した状態はあとから変更されることはない。*4


これにより、CompletableFutureで待ち合わせしている後続処理に対して結果を繋げる事ができる。

CompletableFuture.runAsyncのやっていることとは?

実際のところ、コンビニエンスメソッドであるrunAsyncやsupplyAsyncは、簡略化して言えば、引数で渡されたRunnableやSupplierを以下のようなRunnableでラップしてExecutorに渡しているだけといって良い。

public CompletableFuture<Void> runAsync(Runnable runnable) {
  CompletableFuture<Void> cf = new CompletableFuture<>();
  executor.execute(() -> {
    try {
      runnable.run();
      cf.complete(null);

    } catch (Throwable ex) {
      cf.completeExceptionally(ex);
    }
  });
  return cf;
}

または、

public <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  executor.execute(() -> {
    try {
      cf.complete(supplier.get());

    } catch (Throwable ex) {
      cf.completeExceptionally(ex);
    }
  });
  return cf;
}


これが基本形とわかれば、いろいろ応用ができるだろう。*5

Callableを引数にとるメソッドを作成してみる

ExecutorServiceには、CallableをとってFutureを返すメソッドがあるのに、なぜか、CompletableFutureにはCallableを引数に取るコンビニエンスメソッドがない。

しかし、前述のパターンで実装すれば良いだけなので難しくは無い。

public static <T> CompletableFuture<T> callableAsync(Callable<T> c, Executor e) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  e.execute(() -> {
    try {
      cf.complete(c.call());

    } catch (Throwable ex) {
      cf.completeExceptionally(ex);
    }
  });
  return cf;
}

既存のFutureTaskの置き換え

FutureTask類を使っているコードに対して、CompletableFutureによる待ち合わせを組み込みたい場合もあるかもしれない。


たとえば、ExecutorServiceを使う以下のようなコードがあった場合、

  ExecutorService es = Executors.newCachedThreadPool();
...
  Future<String> future = es.submit(() -> {
    Thread.sleep(1000);
    return "ok";
  });
...
  System.out.println(future.get());
...


CompletableFutrueFutureでもあるので、CompletableFuture.supplyAsyncで置き換えても、基本的な使い方は変わらない。

  ExecutorService es = Executors.newCachedThreadPool();
...
  CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
    try {
      Thread.sleep(1000);
    } catch (InterruptedException ex) {
      throw new RuntimeException(ex);
    }
    return "ok";
  }, es);

  cf.whenComplete((ret, ex) -> {
    System.out.println("done.");
  });
...
  System.out.println(cf.get());
...

シンプルである。

通常の用途であれば、それほど労せず単純な置き換えで十分と思われる。

CompletableFutureのcancelはスレッドに割り込みを発生させない

ただし、1つ問題があり、返されるCompletableFutureからは開始されたタスクをキャンセル制御することができない。


CompletableFutureに対してcancelを指示すると、即時にキャンセル例外による例外終了状態となる。(completeExceptionally(new CancellationException())の呼び出しと同義)


しかし、これはCompletableFutureのステートだけが変わることであって、実際に処理中のスレッドに対して割り込みなどは全く行われないため、つまり、処理中のスレッドは、実行中のまま放置されることになる。


これは、CompletableFutureの設計思想なのだろう。


そもそも、CompletableFutureはタスクを多段にするのが容易であるから、1つ1つのタスクが重くならないように細切れで作るのがベターなのだとは思われる。(たとえば単発のI/Oのようなものは、そもそも割り込みが無意味なことが多いだろう。)

そうすれば、処理中のスレッドに割り込みをかけるまでもなく、次段以降の処理はスマートに中止される。


しかし、1つのタスク内でループ等により長期的なワークを実行するようなCompletableFutureではキャンセルによりスレッドに停止を要求するための仕組みは考える価値はあるかもしれない。

既存のFutureTaskとの連携

従来型のFutureTaskではキャンセル処理により処理中のスレッドに対して"割り込み"によって中止を通知することができる仕組みをもっている。


(FutueTaskは、runされた時点のスレッドを記憶することで、cancel時に、そのスレッドに対してinterruptを発行できるようになっている。また、処理終了時には実行中スレッドがなくなったことを記憶するので、たとえばスレッドを使い回しているスレッドプールなどでも、終わったタスクのcancelによって他のタスクを誤爆する恐れはない。)


キャンセルによる割り込みを可能としつつ、CompletableFutureの継続の仕組みを使いたい場合には、このFutureTaskと連携を行うことで実現できる。

シンプルな連携例

FutureTaskに、CompletableFutureによる待ち合わせを追加するには以下のように修正する感じになる。

  ExecutorService es = Executors.newCachedThreadPool();
...
  FutureTask<String> future = new FutureTask<>(() -> {
    Thread.sleep(1000);
    return "ok";
  });

  CompletableFuture<Void> cf = CompletableFuture.runAsync(future, es);
  cf.whenComplete((ret, ex) -> {
    System.out.println("done"); // 完了通知
  });

  // 成功時の結果、もしくは何らかの失敗理由は、ここで取得できる
  System.out.println(future.get()); 
...

ExecutorServiceのsubmitの既定の実装は、実際のところFutureTaskでRunnable/Callableをラップして、それをexecuteに渡しているだけだから、基本的には上記のようにFutureTaskによる置き換えで良い。


タスクの実行結果/エラーへのアクセスや、cancelなどのコントロールは引き続き、このFutureTaskによって行う。

しかし、これに加えて、タスクの終了をもって後段の処理を自動的に開始させる仕組みはCompletableFutureによって実現できるようになる。


※ この方法では、CompletableFutureではFutureTask#run()が処理を終えたことしか分からず、成功か失敗かの結果は判断できない。(結果はFutureTaskの内部でもっているため。)

もう少し手をいれてCompletableFutureで結果の取得ができるようにする。

CompletableFutureでFutureTaskの結果を取得できるようにするには、前述の基本パターンでいける。

  ExecutorService es = Executors.newCachedThreadPool();
...
  FutureTask<String> future = new FutureTask<>(() -> {
    Thread.sleep(1000);
    return "ok";
  });

  CompletableFuture<String> cf = futureTaskAsync(future, es);
  cf.whenComplete((ret, ex) -> {
    System.out.println("done. ret=" + ret + ", ex=" + ex); // 結果の取得
  });

/**
 * FutueTaskをCompletableFutureでラップする.
 */
public static <T> CompletableFuture<T> futureTaskAsync(
      FutureTask<T> ft, Executor e) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  e.execute(() -> {
    try {
      ft.run();
      cf.complete(ft.get());

    } catch (ExecutionException ex) {
      cf.completeExceptionally(ex.getCause());

    } catch (Throwable ex) {
      cf.completeExceptionally(ex);
    }
  });
  return cf;
}

FutureTaskの完了待ち後に、その結果をCompletableFutureに伝搬させている。

これでcancelによる割り込み以外はCompletableFutureとして扱えるようになる。


※ FutureTaskのget時に結果または例外を取得できるが、例外はExecutionException例外にラップされているので、CompletableFutureの結果に設定する場合には、ラップを解除するようにしている。

更に手をいれて、cancelもできるようにしてみる。

CompletableFutureでcancelしても、FutureTask側に伝わらないのが問題であった。

なので、CompletableFutureがキャンセルされたら、これをFutureTaskに明示的に伝えるようにすれば良い。

/**
 * FutueTaskをCompletableFutureでラップする.
 */
public static <T> CompletableFuture<T> futureTaskAsync(
      FutureTask<T> ft, Executor e) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  e.execute(() -> {
    try {
      ft.run();
      cf.complete(ft.get());

    } catch (ExecutionException ex) {
      cf.completeExceptionally(ex.getCause());

    } catch (Throwable ex) {
      cf.completeExceptionally(ex);
    }
  });

  cf.whenComplete((ret, ex) -> {
    if (ex instanceof CancellationException) {
      // CompletableFutureがキャンセル例外によって完了している場合
      if (!ft.isDone()) {
        // FutureTaskが、まだ完了していなければ割り込みをかける
        ft.cancel(true);
      }
    }
  });
  return cf;
}

これで、以下のようにしてキャンセルを実施できる。

  FutureTask<String> future = new FutureTask<>(() -> {
    try {
      Thread.sleep(1000);
      System.out.println("done!!");
      return "ok";

    } catch (InterruptedException ex) {
      System.out.println(ex); // 割り込みされた場合にプリントされる
      throw ex;
    }
  });

  CompletableFuture<String> cf = futureTaskAsync(future, es);
  cf.whenComplete((ret, ex) -> {
    System.out.println("done. ret=" + ret + ", ex=" + ex); // 結果の取得
  });

  cf.cancel(true); // キャンセル

JavaFXのTaskとCompletableFutureの連携

キャンセルを行いたい場合以外でも、既存のFutureTaskとの連携が有用な場合もある。

たとえば、JavaFXのTaskなどとの連携があげられる。


JavaFXには、UIへのコールバックサポートを持つ特殊なTaskである、javafx.concurrent.TaskというFutureTask派生クラスがある。

  Label labelMessage = new Label();
  ExecutorService es = Executors.newCachedThreadPool();
...    
  Task<String> uiTask = new Task<String>() {
    @Override
    protected String call() throws Exception {
      int mx = 10;
      for (int idx = 0; idx < mx; idx++) {
        updateMessage("process " + (idx + 1) + "/" + mx);
        Thread.sleep(300);
      }
      return "done.";
    }
  };

  // UIのラベルのテキストプロパティにuiTaskのmessageプロパティを接続する.
  // タスク実行スレッドからの通知はUIスレッドにマーシャリングされるため安全である.
  labelMessage.textProperty().bind(uiTask.messageProperty());

  // 非UIスレッドでのタスクの実行
  es.execute(uiTask);
...

JavaFXではアプリケーションスレッド以外からシーングラフに組み込み済みのオブジェクトを操作することは許されていないため、ワーカースレッドから直接ラベルテキストなどを変更することはできない。


しかし、javafx.concurrent.Taskクラスの内部から呼び出されるupdateMessage, updateTitle, updateProgressといったメソッドはアプリケーションスレッドで通知されるようにマーシャリングされているため、直接、UIに接続可能になっている。
(といっても、単に、内部でPlatform.runLaterしているだけだが。)


このTaskクラスの特性を活かしたまま、CompletableFutureによる待ち合わせに加えたい場合にはどうするか?


先に示したFutureTaskの例と同じようにすれば良い。

(ここでは割り込みは必要ないのでシンプルな方法を用いている。)

  ExecutorService es = Executors.newCachedThreadPool();
...    
  Task<String> uiTask = new Task<String>() {
    @Override
    protected String call() throws Exception {
    ....
    }
  };

  // UIのラベルのテキストプロパティにuiTaskのmessageプロパティを接続する
  labelMessage.textProperty().bind(uiTask.messageProperty());

  // 非UIスレッドでのタスクの実行とCompletableFutureの取得
  CompletableFuture<Void> cf = CompletableFuture.runAsync(uiTask, es);
  
  // 待ち合わせ後、UIスレッドで実行
  cf.whenCompleteAsync((ret, ex) -> {
    labelMessage.textProperty().unbind();
    labelMessage.setText("finished!");
  }, Platform::runLater);

これにより、uiTaskは、そのまま機能しつつ、CompletableFutureによる待ち合わせにも参加できるようになる。*6


(なお、javafx.concurrent.Taskクラスにはスレッドの開始終了イベントをハンドルできる仕組みが備わっているが、スレッドの完了を把握して後続の処理を行うという用途であれば、断然にCompletableFutureのほうが使いやすい。)

結論

CompletableFutureは、コンストラクタによる「不完全なCompletableFuture」による基本形と、その仕組みが分かれば、いろいろ応用が利くし、制限事項も理解しやすいように思われる。

他のFutureまわりの仕組みとは異質ではあるが、他にはない特性を持った、とても使い勝手の良いAPIだと思われる。


以上、メモ終了。

*1:システムが既定で持っている論理CPU分のスレッドプール

*2:なお、ForkJoinPool.commonPoolはサンドボックス状態になっておりパーミッションが必要な処理はできないことに注意。

*3:また、JavaEEではアプリケーションコンテナで管理されたExecutorを明示的に使う必要がある。

*4:CompletableFutureは完了が確定したら、その後、状態が変わらないのが原則だが、obtrudeValue, obtrudeExceptionというイレギュラーなメソッドもある。

*5:なお、FingBugsではCompletableFutureのcomplete(null)を引数がnullによる潜在エラーとして報告するバグがあるようである。(2017/3現在) https://github.com/findbugsproject/findbugs/issues/79 ちなみにjdkの内部では内部用のcompleteNullというメソッドを呼んでいるため問題にはならない。

*6:ちなみに、CompletableFutureの***async系メソッドは引数としてExecutorを取るが、Executorは単一メソッドインターフェイス(single abstract method = SAM)であるため、メソッド参照によるPlatform.runLaterを、そのまま指定できる。