seraphyの日記

日記というよりは過去を振り返るときのための単なる備忘録

ForkJoinPoolとForkJoinTaskの特徴と使い方

Fork/Joinとは?

JavaSE7でサポートされるjava.util.concurrent.ForkJoinPoolは、ExecutorServiceの一員であり、一見するとThreadPoolExecutorに似たようなものに思えるが、実際は全く別の、異質のものである。


ForkJoinPoolは、ありていに言えばWork-stealingアルゴリズムの実装であり、無数のタスクを無駄なく論理CPU分のスレッドに割り当てるものである。

ForkJoinPoolはデフォルトでは論理CPU数分のスレッドプールをもつように構築される。

そして、あるタスクが完了するか、もしくは待ちになったら、すぐに別のタスクがアクティブとなり、常に、論理CPU数分のスレッドだけがアクティブになるように調整される。


また、ForkJoinPoolに入れたForkJoinTaskは、そのタスクの中で新たなタスクをforkすると、同じForkJoinPool内にタスクを予約する。
このとき、タスクは既定ではStackのようにタスクが積まれるため、再帰的な計算をさせるのに向いている。


ForkJoinTaskのタスクは他のタスクと連携しあっており、タスク内でforkした子タスクをjoinして結果を待つ場合には、そのスレッドは「待ち」に入るのではなく、タスクが「中断」された状態となり、スレッドは他のタスクの処理に回される(Work-stealing)ようになっている。

(つまり、常に論理CPUに空きがでないようにタスクが割り当てられ、joinしても、スレッドは待ち状態にならない、ということでもある。)


このようなWork-stealing型の他の言語の処理ライブラリとしては、C/C++やFortanで広く使えるOpenMPや、MicrosoftのPPL(並列パターン ライブラリ)のようなものがある。


また、ForkJoinTaskでは、forkしてjoinする場合、最後にforkしたタスクがまだスレッドに割り当てられていなければ、join後に、そのまま同じスレッドでforkした処理を継続する、つまりスレッドを切り替える必要がない、といった最適化を施すことができるようになっているようである。



(なお、ForkJoinTask#invokeは、forkとjoinを同時に行うコンビニエンスメソッドである。)


このように、ForkJoinPoolでキモとなるのは、その名のとおり「fork」と「join」である。

とりわけjoinによってタスクが中断してスレッドが別のforkしたタスクの処理に回されるところが重要である。

実験コード

package jp.seraphyware.forkjoinsample;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {

    /**
     * タスク内からタスクを生成する最大深さ
     */
    private static final int maxDepth = 4;

    /**
     * 実行中、もしくは待機中のタスクの数.
     */
    private static AtomicInteger activeTaskCnt = new AtomicInteger(0);

    /**
     * タスクを生成して返す.
     * @param no タスクの識別子、深さも表す
     * @return タスク
     */
    private static ForkJoinTask<?> createRecursiveTask(final String no) {
        return new RecursiveAction() {
            private static final long serialVersionUID = 1L;

            @Override
            protected void compute() {
                int numOfEntered = activeTaskCnt.incrementAndGet();
                log("[enter] activeTask=" + numOfEntered);
                try {
                    if (no.length() < maxDepth) {
                        // 最大深さに達していなければ、さらに10個の子タスクを生成する.
                        ForkJoinTask<?>[] tasks = new ForkJoinTask<?>[10];
                        for (int idx = 0; idx < tasks.length; idx++) {
                            tasks[idx] = createRecursiveTask(no + idx);
                        }
                        // 子タスクをすべて一括実行し、それを待機する.
                        // (内部的にはforkしてjoinしている)
                        invokeAll(tasks);
                    }
                } finally {
                    numOfEntered = activeTaskCnt.decrementAndGet();
                    log("[leave] activeTask=" + numOfEntered);
                }
            }

            /**
             * 診断メッセージ表示用
             * @param prefix
             */
            private void log(String prefix) {
                // 1つのスレッドの使われ方を見るために、特定のスレッドだけを診断メッセージを表示する。
                if (Thread.currentThread().toString().equals("Thread[ForkJoinPool-1-worker-1,5,main]")) {
                    StringBuilder buf = new StringBuilder();
                    buf.append(prefix);
                    buf.append(" no.").append(no);
                    buf.append(" :numOfActive=").append(getPool().getActiveThreadCount());
                    buf.append(" :poolSize=").append(getPool().getPoolSize());
                    buf.append(" :").append(Thread.currentThread());
                    System.out.println(buf.toString());
                }
            }
        };
    }

    public static void main(String[] args) throws Exception {
        // ForkJoinスレッドプール作成、デフォルトは論理CPU数
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 最初のタスクを生成し、完了を待つ
        forkJoinPool.invoke(createRecursiveTask("0"));

        // スレッドプールの終了
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println("done");
    }
}

実験結果

特定のスレッドの動きだけを確認するため、1つのスレッドのみにログを絞り込んでいる。

[enter] activeTask=1 no.0 :numOfActive=1 :poolSize=1 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=3 no.00 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=8 no.000 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=15 no.0000 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0000 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=17 no.0001 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=18 no.0001 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=22 no.0002 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0002 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=16 no.0003 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=17 no.0003 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=19 no.0004 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0004 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=19 no.0005 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=17 no.0005 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=16 no.0006 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=12 no.0006 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=14 no.0007 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=13 no.0007 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=16 no.0008 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=15 no.0008 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=14 no.0009 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=15 no.0009 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=10 no.000 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=12 no.001 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=16 no.0010 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0010 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=18 no.0011 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0011 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=17 no.0012 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0012 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=17 no.0013 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=17 no.0013 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=20 no.0014 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0014 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=17 no.0015 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0015 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=15 no.0016 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=13 no.0016 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=15 no.0017 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=12 no.0017 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=15 no.0018 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=13 no.0018 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=13 no.0019 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=9 no.0019 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=10 no.001 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=9 no.0029 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=5 no.0029 :numOfActive=7 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=1 no.00 :numOfActive=1 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=0 no.0 :numOfActive=1 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
done

1つのスレッドで繰り返しcomputeメソッドがenterされていることが分かるようにタスクはjoinによって中断・再開されている。

ForkJoinTaskとRecursiveTask/RecursiveActionの違い

ForkJoinPoolに入れるタスクはForkJoinTask型であるが、実際にはRecursiveTaskかRecursiveActionのいずれかから派生するのが便利である。

RecursiveTaskは戻り値のあるタスク、RecursiveActionは戻り値が無いタスクを定義するのに使う。


この2つのクラスの実装は非常に単純で、その親であるForkJoinTaskが戻り値のあり・なしにかかわらず処理できるように汎用的になっているため、派生クラスで用途に応じて必要な部分だけを実装すれば良いように単純化しているだけの、単純なアダプタークラスにすぎない。

そのあたりはFork/Joinのメカニズムとは直接関係ない。

ForkJoinPoolのasyncモードでの利用

ForkJoinPoolを、論理CPU数分のスレッドに空きが出ないようにタスクを割り当てるスレッドプールとして利用することもできる。


通常、ForkJoinPoolに入れるタスクはfork/joinの利用に適したようにスタック式に積み上げられるが、
コンストラクタでasyncモードを指定することでイベント処理に適したFIFO式に設定することができる。


ForkJoinPoolは、タスク内で新たにタスクをforkする必要がなければ、普通なCallable, Runnableをタスクとして入れることができ、
また、Callable, RunnableをForkJoinTaskに変換するためのForkJoinTask.adapt()メソッドを使うこともできる。


すべてのタスクがCallable, Runnableのように内部で子タスクを作ることがなければ、ForkJoinPoolはasyncモードにしてタスクの実行順序をFIFOにしたほうが都合がよい場合が多いと思われる。

package jp.seraphyware.forkjoinsample;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {
        ForkJoinPool pool = new ForkJoinPool(
                Runtime.getRuntime().availableProcessors(),
                ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                null,
                true); // タスクの積み上げにFIFOモードを使用する.(通常はSTACKモード)

        for (int idx = 0; idx < 20; idx++) {
            final int cnt = idx;
            pool.submit(new Runnable() { // 内部的にはForkJoinTask.adapt()でForkJoinTaskに変換している.
                @Override
                public void run() {
                    System.out.println("idx=" + cnt + "/"
                            + Thread.currentThread());
                    try {
                        Thread.sleep((long) (Math.random() * 100));
                    } catch (InterruptedException ex) {
                        // 何もしない
                    }
                }
            });
        }
        pool.shutdown();
        pool.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println("done");
    }
}

この場合、タスクを入れた順に処理されることになる。
ただし、それぞれのタスクの完了までの時間によっては並列実行しているために順序が入れ替わることもありえる。

結論

fork/joinと同じやり方で、ThreadPoolExecutorでタスク内で自分自身のスレッドプールに新しいタスクを割り当て(submit)して、そのタスクをjoin(get等)した場合、1つのスレッドが割り当てられ、且つ、1つのスレッドが待ち状態となるため、あっという間に論理CPU数分のスレッドを使いきって、スレッドプールが枯渇すると思われる。


タスクをfork/joinという小さな粒度で分割しスレッドを効率的に使いまわす仕組みは、従来のExecutorのタスクの単位の制御とは全然違うし、従来のExecutorを直接使うのでは得られない効果である。


ForkJoinPoolを使うことで、たしかに分割統治的なアルゴリズムで並列処理させたい場合には、論理CPUの個数にかかわらず、単に再帰的な処理を記述するだけで並列処理が実現できるようになる。

ForkJoinPoolに再帰タスクを入れるだけで、スレッドの割り当てなどの面倒な処理はお任せできるようになっている。

これは自前で用意するのは、なかなか面倒なものだと思うので、Fork/Joinは大変有益なAPIであるのではないか、と思われる。


また、Java8からは新たに、CompletableFutureというタスク処理のクラスが追加されており、こちらは従来のタスク処理とForkJoinPoolの処理とを統合した、最終形態的な大量メンバを抱える多機能クラスとなっている。

CompletableFutureは、C#などで使われる継続タスク、あるいはjQueryのDeferredのような形でタスクを定義してゆくことができるようになっており、これを非同期タスクとして処理する場合は暗黙でForkJoinPoolを使うようになっているため、Java8では、ForkJoinPoolの活用の機会も増えそうである。

なお、JavaEE7からはjavax.enterprise.concurrent.ManagedExecutorServiceなどでスレッドの制御が可能になっているものの、残念なことに、ForkJoinPoolは対象外になっている。*1 *2

関連資料

*1:fork/joinの細かなタスクの切り替え処理が、JavaEEのスレッドコンテキストまわりの調整とかで難しかった、とか、そうゆう理由だろうか?

*2:JavaEEで、CompletableFutureを使う場合は、ForkJoinPool以外のExecutorを明示的に渡す必要がある、とのこと。

ForkJoinPoolにおけるタスクのブロッキングの実装方法 (2014/06/03)

ForkJoinPoolにおけるタスクのブロッキングの方法

先にForkJoinPoolの特性や使い方について調べたが、「同期処理」や、単純な「待ち」を含むタスクが、どのようになるのか調べてみた。


結論からいうと、同期をとる必要があるタスクは、Java8でサポートされるCompletableFutureを使って、タスクを細切れにして分割したり結合したりするのが、もっとも手っ取り早い。


しかし、ForkJoinPoolが持つメカニズムとしては、少し手間をかけることで自前で実装することも可能となっている。

単純にブロッキングした場合にForkJoinPoolはどうなるか?

ForkJoinPoolは、そのタスクの中でjoinすることでタスクを待ち状態にすると、ただちに空いたCPU資源を他のタスクに割り当てるようにスケジューリングする。

もちろん、スケジューラは、joinメソッドの中で、この切り替えを行っている。


では、たとえば単純に「Thread.sleep()」などでタスクを待ち状態にしたら、どうなるであろうか?


スケジューラはタスクから何も通知を受けることが無いため、実際にはスレッドは寝ているにもかかわらず、他のスレッドにCPUを割り当てることができない。

これは、synchronized/wait/notifyによるスレッドのブロックや、あるいは同期クラスを使ったスレッドのブロックも同様である。

もし、アクティブなスレッドがすべてブロックしたならば、それらのブロックが解除されるまでタスクは全く実行されなくなってしまうことになる。


したがって、ForkJoinTaskの中では、これらのスレッドを直接ブロックするような同期処理は使うべきではない。


もし、ForkJoinTaskの中でタスクのブロックが必要な場合には、ManagedBlockerを使うことができる。

これは、ブロッキングを行うべきか判定するメソッドと、実際にブロッキングを行うメソッドの2つのメソッドをもつ簡単なインターフェイスである。

ここに同期に必要な処理を記述したのちに、ForkJoinPool.managedBlock(java.util.concurrent.ForkJoinPool.ManagedBlocker)メソッドに渡すことで、ブロッキングが必要であればアクティブなタスクを切り替えることができるようにスケジューラと協調してブロッキングを行うことができるようになる。

ForkJoinTaskの中で協調的に動作するSleepの実装例
    // 任意の時間、スリープする管理されたブロッカーを構築する関数
    LongFunction<ManagedBlocker> managedSleepFactory =
            (long tim) -> {
                // 開始時間
                long st = System.currentTimeMillis();
                
                return new ManagedBlocker() {
                    @Override
                    public boolean block() throws rruptedException {
                        if (!isReleasable()) {
                            // まだ指定時間経過していなければ
                            // 10mSecスリープする.
                            // (最小分解能を10mSecとする)
                            Thread.sleep(10);

                            // まだスリープする必要があるか?
                            return isReleasable();
                        }
                        return true;
                    }

                    @Override
                    public boolean isReleasable() {
                        // 開始時間から指定時間が経過しているか?
                        long span = System.currentTimeMillis() - st;
                        return span > tim;
                    }
                };
            };
ForkJoinTaskの中で協調的に動作するSleepの使用例
    // タスクごとのループ回数
    int mxLoop = 5;

    // タスクを連続して生成する
    ForkJoinPool fjPool = ForkJoinPool.commonPool();
    List<ForkJoinTask<?>> tasks = new ArrayList<>();
    IntStream.rangeClosed(0, 14).forEach(idx -> {
        tasks.add(fjPool.submit(ForkJoinTask.adapt(()->{
            IntStream.rangeClosed(0, mxLoop).forEach(loop -> {
                // タスク内ループからログ出力する
                logWriter.accept(idx, loop);
                try {
                    // 一定時間スリープする
                    ForkJoinPool.managedBlock(managedSleepFactory.apply(200));
                    //Thread.sleep(200); // 管理されていないスリープ

                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                    // 無視する.
                }
            });
        })));
    });
    
    // すべてのタスクの完了を明示的に待つ
    // (すべてのスレッドがブロックしているとタスク完了前に
    // awaitTerminationが抜けてしまうため。Java8u4で確認。)
    tasks.forEach(ForkJoinTask::join);
    
    // ForkJoinPoolを終了する。
    fjPool.shutdown();
    fjPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
ログ出力用ヘルパ関数
    // タスクからのログを受け取る
    AtomicInteger cols = new AtomicInteger();
    BiConsumer<Integer, Integer> logWriter = (idx, loop) -> {
        String text = String.format("[%02d] %02d%s  ",
                idx, loop, (loop == mxLoop) ? '*' : ' ');
        synchronized (cols) {
            int col = cols.incrementAndGet() % 10; // 10個ごとに改行
            if (col == 0) {
                System.out.println(text);
            } else {
                System.out.print(text);
            }
        }
    };
実行結果

上記のコードを実行すると、タスクを連続して15個作成し、
それぞれのタスクで5回ログを出しつつ、1回ごとに200mSecのSleepを行う。


このとき、ManagedBlockerによって協調的にスリープするため、スリープしている間は他のタスクがアクティブとなる。


その結果、すべてのタスクが、あたかも同時実行的に並列に進行してゆくことが確認できる。

角括弧の中はタスク番号であり、「*」マークが1タスク中の最終処理を表していて、すべてのタスクの最終処理がまとまって処理されていることがわかる。

[05] 00   [04] 00   [00] 00   [06] 00   [02] 00   [01] 00   [03] 00   [07] 00   [08] 00   [09] 00   
[10] 00   [11] 00   [12] 00   [13] 00   [14] 00   [04] 01   [02] 01   [03] 01   [01] 01   [05] 01   
[00] 01   [06] 01   [11] 01   [07] 01   [08] 01   [09] 01   [10] 01   [12] 01   [13] 01   [14] 01   
[04] 02   [00] 02   [02] 02   [03] 02   [05] 02   [01] 02   [06] 02   [11] 02   [07] 02   [08] 02   
[09] 02   [10] 02   [13] 02   [12] 02   [14] 02   [04] 03   [05] 03   [03] 03   [02] 03   [00] 03   
[01] 03   [06] 03   [11] 03   [07] 03   [09] 03   [08] 03   [10] 03   [14] 03   [12] 03   [13] 03   
[05] 04   [01] 04   [03] 04   [04] 04   [02] 04   [00] 04   [06] 04   [11] 04   [07] 04   [09] 04   
[08] 04   [10] 04   [14] 04   [13] 04   [12] 04   [03] 05*  [05] 05*  [00] 05*  [01] 05*  [04] 05*  
[02] 05*  [06] 05*  [11] 05*  [07] 05*  [08] 05*  [09] 05*  [10] 05*  [12] 05*  [13] 05*  [14] 05*  


これに対して、コメントアウトしている「Thread.sleep(200)」のほうを有効とすると、タスクの切り替えは発生しないため、処理をはじめたタスクのスリープ解除と完了を待ってから次のタスクに進む、という効率の悪い動きとなる。

また、タスクの最終処理はタスクの処理順となっている。
直列的な動作となるため、当然、トータルの所要時間は前のものよりも長くなる。

(以下は、8個の論理CPUがあるマシンの場合の動きである。)

[01] 00   [03] 00   [05] 00   [00] 00   [06] 00   [04] 00   [02] 00   [03] 01   [01] 01   [02] 01   
[05] 01   [04] 01   [00] 01   [06] 01   [01] 02   [05] 02   [02] 02   [00] 02   [03] 02   [04] 02   
[06] 02   [06] 03   [00] 03   [01] 03   [03] 03   [05] 03   [02] 03   [04] 03   [03] 04   [02] 04   
[05] 04   [06] 04   [00] 04   [01] 04   [04] 04   [03] 05*  [01] 05*  [02] 05*  [05] 05*  [00] 05*  
[06] 05*  [04] 05*  [07] 00   [10] 00   [08] 00   [09] 00   [11] 00   [12] 00   [13] 00   [10] 01   
[11] 01   [07] 01   [09] 01   [12] 01   [08] 01   [13] 01   [09] 02   [07] 02   [10] 02   [08] 02   
[12] 02   [11] 02   [13] 02   [07] 03   [11] 03   [10] 03   [09] 03   [12] 03   [08] 03   [13] 03   
[12] 04   [10] 04   [07] 04   [11] 04   [08] 04   [09] 04   [13] 04   [08] 05*  [12] 05*  [10] 05*  
[09] 05*  [07] 05*  [11] 05*  [13] 05*  [14] 00   [14] 01   [14] 02   [14] 03   [14] 04   [14] 05*  


Java8のCompletableFutureを使う

複数のタスクに処理を分解し、それぞれのタスクの実行順序を決めるのであれば、CompletableFutureをつかうと、それらのタスクの実行順序や待ち合わせなどが手軽に実装できる。

3つのタスクを順番に実行するタスクをつくり、これらのタスクを一括して待ち合わせるタスク
    // 複数個の非同期フューチャーを生成して即時実行を開始する.
    @SuppressWarnings("unchecked")
    CompletableFuture<Void>[] cfs = IntStream.rangeClosed(0, 11).oObj(idx -> {
        return CompletableFuture.supplyAsync(() -> {
                // 一段目
                String msg = String.format("[%02d]job-A", idx);
                System.out.println(msg);
                uncheckedSleep.accept(1000);
                return msg;

            }).thenApplyAsync(prevResult -> {
                // 二段目
                String msg = String.format("[%02d]job-B: %s", idx, Result);
                System.out.println(msg);
                uncheckedSleep.accept(500);
                return msg;

            }).thenApplyAsync(prevResult -> {
                // 三段目
                String msg = String.format("[%02d]job-C: %s", idx, Result);
                System.out.println(msg);
                uncheckedSleep.accept(800);
                return msg;

            }).thenAcceptAsync(System.err::println);
        }).toArray(len -> new CompletableFuture[len]);

    // すべてのフューチャーの完了を待ち合わせる
    CompletableFuture<Void> jobs = CompletableFuture.allOf(cfs);
    System.out.println("waiting...");
    jobs.join();
    System.out.println("done!");

なお、CompletableFutureの非同期タスク(*async)系メソッドは、暗黙でForkJoinPoolの既定のプールを使用するので、上記コードは裏ではタスクはForkJoinPool上で動いている。

スリープ関数のヘルパ

スリープが検査例外を出すとラムダ式が面倒なことになるので、ちょっとヘルパを噛ませておく。

    // 検査例外を出さないスリープ
    LongConsumer uncheckedSleep = tm -> {
        try {
            ForkJoinPool.managedBlock(managedSleepFactory.apply(tm));
            //Thread.sleep(tm); // 管理されていないスリープ
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    };
実行結果

waiting...
[06]job-A
[04]job-A
[00]job-A
[01]job-A
[05]job-A
[02]job-A
[03]job-A
[07]job-A
[08]job-A
[09]job-A
[10]job-A
[11]job-A
[06]job-B: [06]job-A
[01]job-B: [01]job-A
[11]job-B: [11]job-A
[02]job-B: [02]job-A
[09]job-B: [09]job-A
[08]job-B: [08]job-A
[03]job-B: [03]job-A
[04]job-B: [04]job-A
[05]job-B: [05]job-A
[07]job-B: [07]job-A
[10]job-B: [10]job-A
[00]job-B: [00]job-A
[11]job-C: [11]job-B: [11]job-A
[06]job-C: [06]job-B: [06]job-A
[01]job-C: [01]job-B: [01]job-A
[03]job-C: [03]job-B: [03]job-A
[07]job-C: [07]job-B: [07]job-A
[02]job-C: [02]job-B: [02]job-A
[08]job-C: [08]job-B: [08]job-A
[09]job-C: [09]job-B: [09]job-A
[00]job-C: [00]job-B: [00]job-A
[10]job-C: [10]job-B: [10]job-A
[04]job-C: [04]job-B: [04]job-A
[05]job-C: [05]job-B: [05]job-A
[07]job-C: [07]job-B: [07]job-A
[08]job-C: [08]job-B: [08]job-A
[02]job-C: [02]job-B: [02]job-A
[06]job-C: [06]job-B: [06]job-A
[01]job-C: [01]job-B: [01]job-A
[03]job-C: [03]job-B: [03]job-A
[11]job-C: [11]job-B: [11]job-A
[00]job-C: [00]job-B: [00]job-A
[10]job-C: [10]job-B: [10]job-A
[05]job-C: [05]job-B: [05]job-A
[04]job-C: [04]job-B: [04]job-A
[09]job-C: [09]job-B: [09]job-A
done!

タスクが順序よく実行されていることが確認できる。

まとめ

  • ForkJoinTaskの中では直接ブロッキングするメソッドは使わない
  • ブロッキングが必要となる場合は ManagedBlocker を実装する
    • ただし、Java8で CompletableFuture を使えるなら、そのほうが簡単である。

以上、メモ終了