TransferQueueについて

Java7ではjava.util.concurrentパッケージに新たにTransferQueueが追加になりました。このキューを利用するとプロデューサがデータをTransferQueueに送信すると、コンシューマがそれを取り出すまでプロデューサは待たされることになるようです。

簡単なプログラムでその動きを示します。

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class Main {
    
    private static final TransferQueue<String> queue = new LinkedTransferQueue<>();

    public static void main(String[] args) throws InterruptedException {
        Thread producer = new Thread(new Producer());
        Thread consumer = new Thread(new Consumer());
        
        producer.start();
        consumer.start();
    }
    
    static class Producer implements Runnable {
        
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                String text = "message no." + Integer.toString(i);
                System.out.println("transfer " + text);
                try {
                    // メッセージがConsumerによって処理されるまで待つ
                    queue.transfer(text);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
            try {
                queue.transfer("message end");
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            System.out.println("producer is shutting down.");
        }
    }
    
    static class Consumer implements Runnable {

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                String text = null;
                try {
                    Thread.sleep(1000);
                    text = queue.take();
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
                if (text.equals("message end")) {
                   break;
                } else {
                    System.out.println("remove " + text);
                }
            }
            System.out.println("consumer is shutting down.");
        }
    }
}

プログラムは2つのスレッドがあり、Producerはメッセージに0〜9までの連番を付与し、TransferQueueに送信しています。ConsumerはProducerよりTransferQueueに送信されたメッセージを取り出し、画面に出力します。Producerは最後に終了を表す"message end"を送信しConsumerがそのメッセージを受信すると、それぞれのスレッドが後処理をして終了します。

このプログラムの実行結果は次のようになります。

transfer message no.0
transfer message no.1
remove message no.0
remove message no.1
transfer message no.2
remove message no.2
transfer message no.3
remove message no.3
transfer message no.4
remove message no.4
transfer message no.5
remove message no.5
transfer message no.6
remove message no.6
transfer message no.7
remove message no.7
transfer message no.8
remove message no.8
transfer message no.9
remove message no.9
consumer is shutting down.
producer is shutting down.

必ずProducerがメッセージを最初に送信し、送信したメッセージがConsumerによって処理された後次の番号のメッセージを送信していることが分かると思います。

TransferQueueはインターフェースでその実装がLinkedTransferQueueになります。

TransferQueueにはtransfer以外にもメソッドがあります。tryTransferやhasWaitingConsumer、getWaitingConsumerCountです。

例えば、hasWaitingConsumerはキューに対してデキュー待ちのスレッドがいるかどうかをtrue/falseで返し、getWaitingConsumerCountある時点でのデキュー待ちのコンシューマの数を返すようです。(詳細はJavaDocで確認してください。)

今のところ使いどころは思いつかないけど、まあ、確認程度に・・・。