seraphyの日記

日記というよりは過去を振り返るときのための単なる備忘録

ForkJoinPoolにおけるタスクのブロッキングの実装方法 (2014/06/03)

ForkJoinPoolにおけるタスクのブロッキングの方法

先にForkJoinPoolの特性や使い方について調べたが、「同期処理」や、単純な「待ち」を含むタスクが、どのようになるのか調べてみた。


結論からいうと、同期をとる必要があるタスクは、Java8でサポートされるCompletableFutureを使って、タスクを細切れにして分割したり結合したりするのが、もっとも手っ取り早い。


しかし、ForkJoinPoolが持つメカニズムとしては、少し手間をかけることで自前で実装することも可能となっている。

単純にブロッキングした場合にForkJoinPoolはどうなるか?

ForkJoinPoolは、そのタスクの中でjoinすることでタスクを待ち状態にすると、ただちに空いたCPU資源を他のタスクに割り当てるようにスケジューリングする。

もちろん、スケジューラは、joinメソッドの中で、この切り替えを行っている。


では、たとえば単純に「Thread.sleep()」などでタスクを待ち状態にしたら、どうなるであろうか?


スケジューラはタスクから何も通知を受けることが無いため、実際にはスレッドは寝ているにもかかわらず、他のスレッドにCPUを割り当てることができない。

これは、synchronized/wait/notifyによるスレッドのブロックや、あるいは同期クラスを使ったスレッドのブロックも同様である。

もし、アクティブなスレッドがすべてブロックしたならば、それらのブロックが解除されるまでタスクは全く実行されなくなってしまうことになる。


したがって、ForkJoinTaskの中では、これらのスレッドを直接ブロックするような同期処理は使うべきではない。


もし、ForkJoinTaskの中でタスクのブロックが必要な場合には、ManagedBlockerを使うことができる。

これは、ブロッキングを行うべきか判定するメソッドと、実際にブロッキングを行うメソッドの2つのメソッドをもつ簡単なインターフェイスである。

ここに同期に必要な処理を記述したのちに、ForkJoinPool.managedBlock(java.util.concurrent.ForkJoinPool.ManagedBlocker)メソッドに渡すことで、ブロッキングが必要であればアクティブなタスクを切り替えることができるようにスケジューラと協調してブロッキングを行うことができるようになる。

ForkJoinTaskの中で協調的に動作するSleepの実装例
    // 任意の時間、スリープする管理されたブロッカーを構築する関数
    LongFunction<ManagedBlocker> managedSleepFactory =
            (long tim) -> {
                // 開始時間
                long st = System.currentTimeMillis();
                
                return new ManagedBlocker() {
                    @Override
                    public boolean block() throws rruptedException {
                        if (!isReleasable()) {
                            // まだ指定時間経過していなければ
                            // 10mSecスリープする.
                            // (最小分解能を10mSecとする)
                            Thread.sleep(10);

                            // まだスリープする必要があるか?
                            return isReleasable();
                        }
                        return true;
                    }

                    @Override
                    public boolean isReleasable() {
                        // 開始時間から指定時間が経過しているか?
                        long span = System.currentTimeMillis() - st;
                        return span > tim;
                    }
                };
            };
ForkJoinTaskの中で協調的に動作するSleepの使用例
    // タスクごとのループ回数
    int mxLoop = 5;

    // タスクを連続して生成する
    ForkJoinPool fjPool = ForkJoinPool.commonPool();
    List<ForkJoinTask<?>> tasks = new ArrayList<>();
    IntStream.rangeClosed(0, 14).forEach(idx -> {
        tasks.add(fjPool.submit(ForkJoinTask.adapt(()->{
            IntStream.rangeClosed(0, mxLoop).forEach(loop -> {
                // タスク内ループからログ出力する
                logWriter.accept(idx, loop);
                try {
                    // 一定時間スリープする
                    ForkJoinPool.managedBlock(managedSleepFactory.apply(200));
                    //Thread.sleep(200); // 管理されていないスリープ

                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                    // 無視する.
                }
            });
        })));
    });
    
    // すべてのタスクの完了を明示的に待つ
    // (すべてのスレッドがブロックしているとタスク完了前に
    // awaitTerminationが抜けてしまうため。Java8u4で確認。)
    tasks.forEach(ForkJoinTask::join);
    
    // ForkJoinPoolを終了する。
    fjPool.shutdown();
    fjPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
ログ出力用ヘルパ関数
    // タスクからのログを受け取る
    AtomicInteger cols = new AtomicInteger();
    BiConsumer<Integer, Integer> logWriter = (idx, loop) -> {
        String text = String.format("[%02d] %02d%s  ",
                idx, loop, (loop == mxLoop) ? '*' : ' ');
        synchronized (cols) {
            int col = cols.incrementAndGet() % 10; // 10個ごとに改行
            if (col == 0) {
                System.out.println(text);
            } else {
                System.out.print(text);
            }
        }
    };
実行結果

上記のコードを実行すると、タスクを連続して15個作成し、
それぞれのタスクで5回ログを出しつつ、1回ごとに200mSecのSleepを行う。


このとき、ManagedBlockerによって協調的にスリープするため、スリープしている間は他のタスクがアクティブとなる。


その結果、すべてのタスクが、あたかも同時実行的に並列に進行してゆくことが確認できる。

角括弧の中はタスク番号であり、「*」マークが1タスク中の最終処理を表していて、すべてのタスクの最終処理がまとまって処理されていることがわかる。

[05] 00   [04] 00   [00] 00   [06] 00   [02] 00   [01] 00   [03] 00   [07] 00   [08] 00   [09] 00   
[10] 00   [11] 00   [12] 00   [13] 00   [14] 00   [04] 01   [02] 01   [03] 01   [01] 01   [05] 01   
[00] 01   [06] 01   [11] 01   [07] 01   [08] 01   [09] 01   [10] 01   [12] 01   [13] 01   [14] 01   
[04] 02   [00] 02   [02] 02   [03] 02   [05] 02   [01] 02   [06] 02   [11] 02   [07] 02   [08] 02   
[09] 02   [10] 02   [13] 02   [12] 02   [14] 02   [04] 03   [05] 03   [03] 03   [02] 03   [00] 03   
[01] 03   [06] 03   [11] 03   [07] 03   [09] 03   [08] 03   [10] 03   [14] 03   [12] 03   [13] 03   
[05] 04   [01] 04   [03] 04   [04] 04   [02] 04   [00] 04   [06] 04   [11] 04   [07] 04   [09] 04   
[08] 04   [10] 04   [14] 04   [13] 04   [12] 04   [03] 05*  [05] 05*  [00] 05*  [01] 05*  [04] 05*  
[02] 05*  [06] 05*  [11] 05*  [07] 05*  [08] 05*  [09] 05*  [10] 05*  [12] 05*  [13] 05*  [14] 05*  


これに対して、コメントアウトしている「Thread.sleep(200)」のほうを有効とすると、タスクの切り替えは発生しないため、処理をはじめたタスクのスリープ解除と完了を待ってから次のタスクに進む、という効率の悪い動きとなる。

また、タスクの最終処理はタスクの処理順となっている。
直列的な動作となるため、当然、トータルの所要時間は前のものよりも長くなる。

(以下は、8個の論理CPUがあるマシンの場合の動きである。)

[01] 00   [03] 00   [05] 00   [00] 00   [06] 00   [04] 00   [02] 00   [03] 01   [01] 01   [02] 01   
[05] 01   [04] 01   [00] 01   [06] 01   [01] 02   [05] 02   [02] 02   [00] 02   [03] 02   [04] 02   
[06] 02   [06] 03   [00] 03   [01] 03   [03] 03   [05] 03   [02] 03   [04] 03   [03] 04   [02] 04   
[05] 04   [06] 04   [00] 04   [01] 04   [04] 04   [03] 05*  [01] 05*  [02] 05*  [05] 05*  [00] 05*  
[06] 05*  [04] 05*  [07] 00   [10] 00   [08] 00   [09] 00   [11] 00   [12] 00   [13] 00   [10] 01   
[11] 01   [07] 01   [09] 01   [12] 01   [08] 01   [13] 01   [09] 02   [07] 02   [10] 02   [08] 02   
[12] 02   [11] 02   [13] 02   [07] 03   [11] 03   [10] 03   [09] 03   [12] 03   [08] 03   [13] 03   
[12] 04   [10] 04   [07] 04   [11] 04   [08] 04   [09] 04   [13] 04   [08] 05*  [12] 05*  [10] 05*  
[09] 05*  [07] 05*  [11] 05*  [13] 05*  [14] 00   [14] 01   [14] 02   [14] 03   [14] 04   [14] 05*  


Java8のCompletableFutureを使う

複数のタスクに処理を分解し、それぞれのタスクの実行順序を決めるのであれば、CompletableFutureをつかうと、それらのタスクの実行順序や待ち合わせなどが手軽に実装できる。

3つのタスクを順番に実行するタスクをつくり、これらのタスクを一括して待ち合わせるタスク
    // 複数個の非同期フューチャーを生成して即時実行を開始する.
    @SuppressWarnings("unchecked")
    CompletableFuture<Void>[] cfs = IntStream.rangeClosed(0, 11).oObj(idx -> {
        return CompletableFuture.supplyAsync(() -> {
                // 一段目
                String msg = String.format("[%02d]job-A", idx);
                System.out.println(msg);
                uncheckedSleep.accept(1000);
                return msg;

            }).thenApplyAsync(prevResult -> {
                // 二段目
                String msg = String.format("[%02d]job-B: %s", idx, Result);
                System.out.println(msg);
                uncheckedSleep.accept(500);
                return msg;

            }).thenApplyAsync(prevResult -> {
                // 三段目
                String msg = String.format("[%02d]job-C: %s", idx, Result);
                System.out.println(msg);
                uncheckedSleep.accept(800);
                return msg;

            }).thenAcceptAsync(System.err::println);
        }).toArray(len -> new CompletableFuture[len]);

    // すべてのフューチャーの完了を待ち合わせる
    CompletableFuture<Void> jobs = CompletableFuture.allOf(cfs);
    System.out.println("waiting...");
    jobs.join();
    System.out.println("done!");

なお、CompletableFutureの非同期タスク(*async)系メソッドは、暗黙でForkJoinPoolの既定のプールを使用するので、上記コードは裏ではタスクはForkJoinPool上で動いている。

スリープ関数のヘルパ

スリープが検査例外を出すとラムダ式が面倒なことになるので、ちょっとヘルパを噛ませておく。

    // 検査例外を出さないスリープ
    LongConsumer uncheckedSleep = tm -> {
        try {
            ForkJoinPool.managedBlock(managedSleepFactory.apply(tm));
            //Thread.sleep(tm); // 管理されていないスリープ
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    };
実行結果

waiting...
[06]job-A
[04]job-A
[00]job-A
[01]job-A
[05]job-A
[02]job-A
[03]job-A
[07]job-A
[08]job-A
[09]job-A
[10]job-A
[11]job-A
[06]job-B: [06]job-A
[01]job-B: [01]job-A
[11]job-B: [11]job-A
[02]job-B: [02]job-A
[09]job-B: [09]job-A
[08]job-B: [08]job-A
[03]job-B: [03]job-A
[04]job-B: [04]job-A
[05]job-B: [05]job-A
[07]job-B: [07]job-A
[10]job-B: [10]job-A
[00]job-B: [00]job-A
[11]job-C: [11]job-B: [11]job-A
[06]job-C: [06]job-B: [06]job-A
[01]job-C: [01]job-B: [01]job-A
[03]job-C: [03]job-B: [03]job-A
[07]job-C: [07]job-B: [07]job-A
[02]job-C: [02]job-B: [02]job-A
[08]job-C: [08]job-B: [08]job-A
[09]job-C: [09]job-B: [09]job-A
[00]job-C: [00]job-B: [00]job-A
[10]job-C: [10]job-B: [10]job-A
[04]job-C: [04]job-B: [04]job-A
[05]job-C: [05]job-B: [05]job-A
[07]job-C: [07]job-B: [07]job-A
[08]job-C: [08]job-B: [08]job-A
[02]job-C: [02]job-B: [02]job-A
[06]job-C: [06]job-B: [06]job-A
[01]job-C: [01]job-B: [01]job-A
[03]job-C: [03]job-B: [03]job-A
[11]job-C: [11]job-B: [11]job-A
[00]job-C: [00]job-B: [00]job-A
[10]job-C: [10]job-B: [10]job-A
[05]job-C: [05]job-B: [05]job-A
[04]job-C: [04]job-B: [04]job-A
[09]job-C: [09]job-B: [09]job-A
done!

タスクが順序よく実行されていることが確認できる。

まとめ

  • ForkJoinTaskの中では直接ブロッキングするメソッドは使わない
  • ブロッキングが必要となる場合は ManagedBlocker を実装する
    • ただし、Java8で CompletableFuture を使えるなら、そのほうが簡単である。

以上、メモ終了