seraphyの日記

日記というよりは過去を振り返るときのための単なる備忘録

ForkJoinPoolとForkJoinTaskの特徴と使い方

Fork/Joinとは?

JavaSE7でサポートされるjava.util.concurrent.ForkJoinPoolは、ExecutorServiceの一員であり、一見するとThreadPoolExecutorに似たようなものに思えるが、実際は全く別の、異質のものである。


ForkJoinPoolは、ありていに言えばWork-stealingアルゴリズムの実装であり、無数のタスクを無駄なく論理CPU分のスレッドに割り当てるものである。

ForkJoinPoolはデフォルトでは論理CPU数分のスレッドプールをもつように構築される。

そして、あるタスクが完了するか、もしくは待ちになったら、すぐに別のタスクがアクティブとなり、常に、論理CPU数分のスレッドだけがアクティブになるように調整される。


また、ForkJoinPoolに入れたForkJoinTaskは、そのタスクの中で新たなタスクをforkすると、同じForkJoinPool内にタスクを予約する。
このとき、タスクは既定ではStackのようにタスクが積まれるため、再帰的な計算をさせるのに向いている。


ForkJoinTaskのタスクは他のタスクと連携しあっており、タスク内でforkした子タスクをjoinして結果を待つ場合には、そのスレッドは「待ち」に入るのではなく、タスクが「中断」された状態となり、スレッドは他のタスクの処理に回される(Work-stealing)ようになっている。

(つまり、常に論理CPUに空きがでないようにタスクが割り当てられ、joinしても、スレッドは待ち状態にならない、ということでもある。)


このようなWork-stealing型の他の言語の処理ライブラリとしては、C/C++やFortanで広く使えるOpenMPや、MicrosoftのPPL(並列パターン ライブラリ)のようなものがある。


また、ForkJoinTaskでは、forkしてjoinする場合、最後にforkしたタスクがまだスレッドに割り当てられていなければ、join後に、そのまま同じスレッドでforkした処理を継続する、つまりスレッドを切り替える必要がない、といった最適化を施すことができるようになっているようである。



(なお、ForkJoinTask#invokeは、forkとjoinを同時に行うコンビニエンスメソッドである。)


このように、ForkJoinPoolでキモとなるのは、その名のとおり「fork」と「join」である。

とりわけjoinによってタスクが中断してスレッドが別のforkしたタスクの処理に回されるところが重要である。

実験コード

package jp.seraphyware.forkjoinsample;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {

    /**
     * タスク内からタスクを生成する最大深さ
     */
    private static final int maxDepth = 4;

    /**
     * 実行中、もしくは待機中のタスクの数.
     */
    private static AtomicInteger activeTaskCnt = new AtomicInteger(0);

    /**
     * タスクを生成して返す.
     * @param no タスクの識別子、深さも表す
     * @return タスク
     */
    private static ForkJoinTask<?> createRecursiveTask(final String no) {
        return new RecursiveAction() {
            private static final long serialVersionUID = 1L;

            @Override
            protected void compute() {
                int numOfEntered = activeTaskCnt.incrementAndGet();
                log("[enter] activeTask=" + numOfEntered);
                try {
                    if (no.length() < maxDepth) {
                        // 最大深さに達していなければ、さらに10個の子タスクを生成する.
                        ForkJoinTask<?>[] tasks = new ForkJoinTask<?>[10];
                        for (int idx = 0; idx < tasks.length; idx++) {
                            tasks[idx] = createRecursiveTask(no + idx);
                        }
                        // 子タスクをすべて一括実行し、それを待機する.
                        // (内部的にはforkしてjoinしている)
                        invokeAll(tasks);
                    }
                } finally {
                    numOfEntered = activeTaskCnt.decrementAndGet();
                    log("[leave] activeTask=" + numOfEntered);
                }
            }

            /**
             * 診断メッセージ表示用
             * @param prefix
             */
            private void log(String prefix) {
                // 1つのスレッドの使われ方を見るために、特定のスレッドだけを診断メッセージを表示する。
                if (Thread.currentThread().toString().equals("Thread[ForkJoinPool-1-worker-1,5,main]")) {
                    StringBuilder buf = new StringBuilder();
                    buf.append(prefix);
                    buf.append(" no.").append(no);
                    buf.append(" :numOfActive=").append(getPool().getActiveThreadCount());
                    buf.append(" :poolSize=").append(getPool().getPoolSize());
                    buf.append(" :").append(Thread.currentThread());
                    System.out.println(buf.toString());
                }
            }
        };
    }

    public static void main(String[] args) throws Exception {
        // ForkJoinスレッドプール作成、デフォルトは論理CPU数
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 最初のタスクを生成し、完了を待つ
        forkJoinPool.invoke(createRecursiveTask("0"));

        // スレッドプールの終了
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println("done");
    }
}

実験結果

特定のスレッドの動きだけを確認するため、1つのスレッドのみにログを絞り込んでいる。

[enter] activeTask=1 no.0 :numOfActive=1 :poolSize=1 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=3 no.00 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=8 no.000 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=15 no.0000 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0000 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=17 no.0001 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=18 no.0001 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=22 no.0002 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0002 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=16 no.0003 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=17 no.0003 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=19 no.0004 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0004 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=19 no.0005 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=17 no.0005 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=16 no.0006 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=12 no.0006 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=14 no.0007 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=13 no.0007 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=16 no.0008 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=15 no.0008 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=14 no.0009 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=15 no.0009 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=10 no.000 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=12 no.001 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=16 no.0010 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0010 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=18 no.0011 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0011 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=17 no.0012 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0012 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=17 no.0013 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=17 no.0013 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=20 no.0014 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0014 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=17 no.0015 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=16 no.0015 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=15 no.0016 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=13 no.0016 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=15 no.0017 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=12 no.0017 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=15 no.0018 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=13 no.0018 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=13 no.0019 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=9 no.0019 :numOfActive=9 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=10 no.001 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[enter] activeTask=9 no.0029 :numOfActive=8 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=5 no.0029 :numOfActive=7 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=1 no.00 :numOfActive=1 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
[leave] activeTask=0 no.0 :numOfActive=1 :poolSize=8 :Thread[ForkJoinPool-1-worker-1,5,main]
done

1つのスレッドで繰り返しcomputeメソッドがenterされていることが分かるようにタスクはjoinによって中断・再開されている。

ForkJoinTaskとRecursiveTask/RecursiveActionの違い

ForkJoinPoolに入れるタスクはForkJoinTask型であるが、実際にはRecursiveTaskかRecursiveActionのいずれかから派生するのが便利である。

RecursiveTaskは戻り値のあるタスク、RecursiveActionは戻り値が無いタスクを定義するのに使う。


この2つのクラスの実装は非常に単純で、その親であるForkJoinTaskが戻り値のあり・なしにかかわらず処理できるように汎用的になっているため、派生クラスで用途に応じて必要な部分だけを実装すれば良いように単純化しているだけの、単純なアダプタークラスにすぎない。

そのあたりはFork/Joinのメカニズムとは直接関係ない。

ForkJoinPoolのasyncモードでの利用

ForkJoinPoolを、論理CPU数分のスレッドに空きが出ないようにタスクを割り当てるスレッドプールとして利用することもできる。


通常、ForkJoinPoolに入れるタスクはfork/joinの利用に適したようにスタック式に積み上げられるが、
コンストラクタでasyncモードを指定することでイベント処理に適したFIFO式に設定することができる。


ForkJoinPoolは、タスク内で新たにタスクをforkする必要がなければ、普通なCallable, Runnableをタスクとして入れることができ、
また、Callable, RunnableをForkJoinTaskに変換するためのForkJoinTask.adapt()メソッドを使うこともできる。


すべてのタスクがCallable, Runnableのように内部で子タスクを作ることがなければ、ForkJoinPoolはasyncモードにしてタスクの実行順序をFIFOにしたほうが都合がよい場合が多いと思われる。

package jp.seraphyware.forkjoinsample;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws Exception {
        ForkJoinPool pool = new ForkJoinPool(
                Runtime.getRuntime().availableProcessors(),
                ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                null,
                true); // タスクの積み上げにFIFOモードを使用する.(通常はSTACKモード)

        for (int idx = 0; idx < 20; idx++) {
            final int cnt = idx;
            pool.submit(new Runnable() { // 内部的にはForkJoinTask.adapt()でForkJoinTaskに変換している.
                @Override
                public void run() {
                    System.out.println("idx=" + cnt + "/"
                            + Thread.currentThread());
                    try {
                        Thread.sleep((long) (Math.random() * 100));
                    } catch (InterruptedException ex) {
                        // 何もしない
                    }
                }
            });
        }
        pool.shutdown();
        pool.awaitTermination(10, TimeUnit.SECONDS);
        System.out.println("done");
    }
}

この場合、タスクを入れた順に処理されることになる。
ただし、それぞれのタスクの完了までの時間によっては並列実行しているために順序が入れ替わることもありえる。

結論

fork/joinと同じやり方で、ThreadPoolExecutorでタスク内で自分自身のスレッドプールに新しいタスクを割り当て(submit)して、そのタスクをjoin(get等)した場合、1つのスレッドが割り当てられ、且つ、1つのスレッドが待ち状態となるため、あっという間に論理CPU数分のスレッドを使いきって、スレッドプールが枯渇すると思われる。


タスクをfork/joinという小さな粒度で分割しスレッドを効率的に使いまわす仕組みは、従来のExecutorのタスクの単位の制御とは全然違うし、従来のExecutorを直接使うのでは得られない効果である。


ForkJoinPoolを使うことで、たしかに分割統治的なアルゴリズムで並列処理させたい場合には、論理CPUの個数にかかわらず、単に再帰的な処理を記述するだけで並列処理が実現できるようになる。

ForkJoinPoolに再帰タスクを入れるだけで、スレッドの割り当てなどの面倒な処理はお任せできるようになっている。

これは自前で用意するのは、なかなか面倒なものだと思うので、Fork/Joinは大変有益なAPIであるのではないか、と思われる。


また、Java8からは新たに、CompletableFutureというタスク処理のクラスが追加されており、こちらは従来のタスク処理とForkJoinPoolの処理とを統合した、最終形態的な大量メンバを抱える多機能クラスとなっている。

CompletableFutureは、C#などで使われる継続タスク、あるいはjQueryのDeferredのような形でタスクを定義してゆくことができるようになっており、これを非同期タスクとして処理する場合は暗黙でForkJoinPoolを使うようになっているため、Java8では、ForkJoinPoolの活用の機会も増えそうである。

なお、JavaEE7からはjavax.enterprise.concurrent.ManagedExecutorServiceなどでスレッドの制御が可能になっているものの、残念なことに、ForkJoinPoolは対象外になっている。*1 *2

関連資料

*1:fork/joinの細かなタスクの切り替え処理が、JavaEEのスレッドコンテキストまわりの調整とかで難しかった、とか、そうゆう理由だろうか?

*2:JavaEEで、CompletableFutureを使う場合は、ForkJoinPool以外のExecutorを明示的に渡す必要がある、とのこと。