Java7のPhaserクラスを利用した同期処理

Java7ではPhaserという同期処理用のクラスが追加になっています。PhaserはCyclicBarriarやCountDownLatch同様にスレッドの同期処理を行うために利用しますが、より柔軟性に富んでいるようです。

Phaserクラスの詳細はJavaDocを参照すればわかると思うので、今回も自分用のメモとして簡単な利用方法を示したサンプルプログラムを残します。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

public class PhaserTest {

    public static void main(String[] args) {
        List<Runnable> tasks = new ArrayList<>();
        tasks.add(new Task());
        tasks.add(new Task());
        tasks.add(new Task());
        tasks.add(new Task());
        runTask(tasks);
    }
    
    private static void runTask(List<Runnable> tasks) {
        // タスク数+1(メインスレッド)となるように初期値1でPhaserを生成
        final Phaser phaser = new Phaser(1);
        for (final Runnable t : tasks) {
            final String taskName = t.toString();
            // Phaserに登録
            phaser.register();
            new Thread() {
                public void run() {
                    System.out.println(taskName + " is waiting for the remaining tasks.");
                    // すべてのタスクの実行準備が整うまで待機
                    phaser.arriveAndAwaitAdvance();
                    System.out.println(taskName + " is ready to start.");
                    t.run();
                }
            }.start();
        }
        // 登録解除
        phaser.arriveAndDeregister();
        System.out.println("Phaser was deregistered.");
    }
   
    private static final class Task implements Runnable {

        private static final AtomicInteger idGenerator = new AtomicInteger();
        private final long id = idGenerator.incrementAndGet();
        
        @Override
        public void run() {
            System.out.println(toString() + " started.");
            try {
                Thread.sleep(getSleepTime());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(toString() + " stopped.");
        }
        
        @Override
        public String toString() {
            return "Task [id = " + id + "]";
        }
        
        private static long getSleepTime() {
            return ThreadLocalRandom.current().nextLong(100, 1000);
        }
    }

}

基本的なことはコメントとPhaserのJavaDocを読めばわかると思うのでポイントだけ簡単にまとめると、まずnew Phaser(1)でPhaserオブジェクトを生成し、for-eachループでタスク数分ループしながらPhaser#registerで未到達のパーティを登録します。

ループを抜けた時点でPhaserに登録されているパーティ数は5になっています。Phaser#getRegisteredPartiesメソッドで確認可能です。

その後、メインスレッド側で登録を解除します。(phaser.arriveAndDeregister())

この時点でパーティ数は4となります。ループ内で実行したスレッドはPhaser#arriveAndAwaitAdvanceというメソッドで登録したPhaser全てが当該ポイントまで到達するまで待機します。

メインスレッド側で登録解除を行った時点で、Phaser#arriveAndAwaitAdvanceに到達しているスレッドがいくつあっても関係はありません。

もしすべてのスレッドが当該地点まで到達して最後にメインスレッドが登録の解除を行ったならば待機中のスレッドが動作し始めます。

まだ未到達のスレッドがあった場合は、とりあえずメインスレッドは終了し、登録パーティ数が4となった状態で4スレッドが当該ポイントに到達した時点で待機しているスレッド含めすべてのスレッドが処理を開始します。

Phaserクラスにはこのほかにも様々なメソッドが用意されているので時間があるときにいろいろ実験してみようと思います。