スポンサーリンク

【Java】LinkedBlockingQueue【特徴まとめ】

スポンサーリンク

BlockingQueue

producer – consumer , publish – subscribeパターンの実装を用意するにするためのjava. util.concurrentパッケージのinterfaceです。

実態としては以下が存在し、すべてスレッドセーフで実装されています。

ArrayBlockingQueue 配列ベースのblocking queue. 性質はLinkedBlockingQueueと同様だが、一般的にLinkedBlockingQueueに対して性能は劣る。
LinkedBlockingQueue Queueのcapacity以内であればいつでも挿入可能。capacityを超えた場合は挿入を待機する。
LinkedTransferQueue Consumerが待機状態となるまで挿入を待機する。(queue成長を防止可能)
PriorityBlockingQueue 指定した優先度に基づいてqueueから取得可能。
LinkedBlokingDeque dequeの性質(キューの先端・終端のどちらからでも挿入・取得が可能)を持ちます。
DelayQueue queueに挿入した要素は一定時間後に取り出し可能

最もオーソドックスなproducer – consumer パターンではLinkedBlockingQueueを使用します。

LinkedBlockingQueue

put動作

queueサイズがcapacityである場合、Producerはput時に待機します。queueサイズがcapacity未満になった場合(Consumerが取得した場合)、notifyされてputを実行します。

waitされたproducerが複数いた場合、最も長くwaitしたproducerから再開されることが保証されています。

コード例

Main

public class Main {
    private static ScheduledExecutorService cProducerService = Executors.newSingleThreadScheduledExecutor();
    private static ScheduledExecutorService cConsumerService = Executors.newSingleThreadScheduledExecutor();

    public static void main(String[] args){
        BlockingQueue<Data> queue = new LinkedBlockingQueue<>(3);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        cProducerService.scheduleWithFixedDelay(producer, 0, 100, TimeUnit.MILLISECONDS);
        cConsumerService.scheduleWithFixedDelay(consumer, 0, 300, TimeUnit.MILLISECONDS);
    }
}

Producer

public class Producer extends Thread{
    private final BlockingQueue<Data> queue;
    private int number = 0;

    public Producer(BlockingQueue<Data> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        long start = System.nanoTime() / 1000 / 1000;
        try {
            queue.put(new Data(number));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.nanoTime() / 1000 / 1000;
        System.out.println("put process time[ms]:" + (end - start) + "," + "put:" + number + ",queue size:" + queue.remainingCapacity());
        number++;
    }
}

Consumer

public class Consumer extends Thread{
    private final BlockingQueue<Data> queue;

    public Consumer(BlockingQueue<Data> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            long start = System.nanoTime() / 1000 / 1000;
            Data data = queue.take();
            long end = System.nanoTime() / 1000 / 1000;
            System.out.println("take process time[ms]:" + (end - start) + ",take:" + data + ",queue size:" + queue.remainingCapacity());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Data

public class Data {
    private final int number;

    public Data(int number) {
        this.number = number;
    }

    public int getNumber() {
        return number;
    }

    @Override
    public String toString() {
        return "Data{" +
                "number=" + number +
                '}';
    }
}

実行結果

 put process time[ms]:1,put:0,queue size:2
 take process time[ms]:1,take:Data{number=0},queue size:3
 put process time[ms]:0,put:1,queue size:2
 put process time[ms]:1,put:2,queue size:1
 take process time[ms]:0,take:Data{number=1},queue size:2
 put process time[ms]:0,put:3,queue size:1
 put process time[ms]:0,put:4,queue size:0
 take process time[ms]:0,take:Data{number=2},queue size:1
 put process time[ms]:78,put:5,queue size:0
 take process time[ms]:0,take:Data{number=3},queue size:1
 put process time[ms]:205,put:6,queue size:0
 take process time[ms]:0,take:Data{number=4},queue size:1
 put process time[ms]:204,put:7,queue size:0
 put process time[ms]:204,put:8,queue size:0

capacityが3のqueueを作成し、Producer 100msec, Consuer 300msec周期でデータをput / takeします。

初めは順調にProducerはputしますが、5番目をputする段階でqueue capacityが上限に到達しているので、waitします。Consumerがtakeした後にwaitが解除され、Producerがput:5をしている様子を確認できます。(78msと通常よりも大幅に時間がかかっており、waitしていたことが分かる。)

*queueに実行時のqueueサイズを同時に取得する手段がないため、queue sizeはrace condition次第で実際とは異なる値が表示されます。

offer動作

queueサイズがcapacityである場合、Producerはoffer時にfalseを返却します。queueサイズがcapacity未満である場合、直ちに挿入しtrueを返却します。

なお、offer(E e, long timeout, TimeUnit unit)も存在し、こちらは一定時間のwaitを許容します。設定したtimeout時間を超過した場合、falseが返却されます。

コード例

Producerのみ変更しています。

public class Producer extends Thread{
    private final BlockingQueue<Data> queue;
    private int number = 0;

    public Producer(BlockingQueue<Data> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        long start = System.nanoTime() / 1000;
        boolean result = queue.offer(new Data(number));
        long end = System.nanoTime() / 1000;
        String str = result ? "put:" + number + ",queue size:" + queue.remainingCapacity() : "failed to offer:" + number;
        System.out.println("put process time[ms]:" + (end - start) + "," + str);
        number++;
    }

}

実行結果

 offer process time[ms]:319,offer:0,queue size:2
 take process time[ms]:1,take:Data{number=0},queue size:3
 offer process time[ms]:13,offer:1,queue size:2
 offer process time[ms]:9,offer:2,queue size:1
 take process time[ms]:0,take:Data{number=1},queue size:2
 offer process time[ms]:6,offer:3,queue size:1
 offer process time[ms]:6,offer:4,queue size:0
 offer process time[ms]:3,failed to offer:5
 take process time[ms]:0,take:Data{number=2},queue size:1
 offer process time[ms]:15,offer:6,queue size:0

5をputしようとした際にfailedの文字が出ていることが分かります。

offerの即時実行性

なお、offer時の即時実行性については以下のようになります。

offer process time[ms]:3,offer:966,queue size:0
 offer process time[ms]:1,failed to offer:967
 offer process time[ms]:0,failed to offer:968
 offer process time[ms]:3,offer:969,queue size:0
 offer process time[ms]:1,failed to offer:970
 offer process time[ms]:25,offer:971,queue size:0
 offer process time[ms]:1,failed to offer:972
 offer process time[ms]:0,failed to offer:973
 offer process time[ms]:3,offer:974,queue size:0
 offer process time[ms]:1,failed to offer:975

971の場合ですが、offer時に他と比較して処理時間が大きくなっています。

1.8におけるLinkedBlockingQueueの実装は以下のようになっています

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    final int c;
    final Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() == capacity)
            return false;
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

したがって、capacityがfullの場合は即座にofferはfalseを返却しますが、それ以降はputLockを必要とします。したがって、putLockが他に取得されている場合には、“即座に”処理が終了する訳ではありません。

putLockは例えばclear, remove, toArray等で使用されています。これらの処理は長時間の処理を必要とする操作ではありませんが、それでもQueueの処理量によっては致命的になることもあるので、即時返答性を期待して使用する際には注意が必要です。

タイトルとURLをコピーしました