BlockingQueue
producer – consumer , publish – subscribeパターンの実装を用意するにするためのjava. util.concurrentパッケージのinterfaceです。
実態としては以下が存在し、すべてスレッドセーフで実装されています。
ArrayBlockingQueue | 配列ベースのblocking queue. 性質はLinkedBlockingQueueと同様だが、一般的にLinkedBlockingQueueに対して性能は劣る。 |
LinkedBlockingQueue | Queueのcapacity以内であればいつでも挿入可能。capacityを超えた場合は挿入を待機する。 |
LinkedTransferQueue | Consumerが待機状態となるまで挿入を待機する。(queue成長を防止可能) |
PriorityBlockingQueue | 指定した優先度に基づいてqueueから取得可能。 |
LinkedBlokingDeque | dequeの性質(キューの先端・終端のどちらからでも挿入・取得が可能)を持ちます。 |
DelayQueue | queueに挿入した要素は一定時間後に取り出し可能 |
最もオーソドックスなproducer – consumer パターンではLinkedBlockingQueueを使用します。
LinkedBlockingQueue
put動作
queueサイズがcapacityである場合、Producerはput時に待機します。queueサイズがcapacity未満になった場合(Consumerが取得した場合)、notifyされてputを実行します。
waitされたproducerが複数いた場合、最も長くwaitしたproducerから再開されることが保証されています。
コード例
Main
public class Main { private static ScheduledExecutorService cProducerService = Executors.newSingleThreadScheduledExecutor(); private static ScheduledExecutorService cConsumerService = Executors.newSingleThreadScheduledExecutor(); public static void main(String[] args){ BlockingQueue<Data> queue = new LinkedBlockingQueue<>(3); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); cProducerService.scheduleWithFixedDelay(producer, 0, 100, TimeUnit.MILLISECONDS); cConsumerService.scheduleWithFixedDelay(consumer, 0, 300, TimeUnit.MILLISECONDS); } }
Producer
public class Producer extends Thread{ private final BlockingQueue<Data> queue; private int number = 0; public Producer(BlockingQueue<Data> queue) { this.queue = queue; } @Override public void run() { long start = System.nanoTime() / 1000 / 1000; try { queue.put(new Data(number)); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.nanoTime() / 1000 / 1000; System.out.println("put process time[ms]:" + (end - start) + "," + "put:" + number + ",queue size:" + queue.remainingCapacity()); number++; } }
Consumer
public class Consumer extends Thread{ private final BlockingQueue<Data> queue; public Consumer(BlockingQueue<Data> queue) { this.queue = queue; } @Override public void run() { try { long start = System.nanoTime() / 1000 / 1000; Data data = queue.take(); long end = System.nanoTime() / 1000 / 1000; System.out.println("take process time[ms]:" + (end - start) + ",take:" + data + ",queue size:" + queue.remainingCapacity()); } catch (InterruptedException e) { e.printStackTrace(); } } }
Data
public class Data { private final int number; public Data(int number) { this.number = number; } public int getNumber() { return number; } @Override public String toString() { return "Data{" + "number=" + number + '}'; } }
実行結果
put process time[ms]:1,put:0,queue size:2 take process time[ms]:1,take:Data{number=0},queue size:3 put process time[ms]:0,put:1,queue size:2 put process time[ms]:1,put:2,queue size:1 take process time[ms]:0,take:Data{number=1},queue size:2 put process time[ms]:0,put:3,queue size:1 put process time[ms]:0,put:4,queue size:0 take process time[ms]:0,take:Data{number=2},queue size:1 put process time[ms]:78,put:5,queue size:0 take process time[ms]:0,take:Data{number=3},queue size:1 put process time[ms]:205,put:6,queue size:0 take process time[ms]:0,take:Data{number=4},queue size:1 put process time[ms]:204,put:7,queue size:0 put process time[ms]:204,put:8,queue size:0
capacityが3のqueueを作成し、Producer 100msec, Consuer 300msec周期でデータをput / takeします。
初めは順調にProducerはputしますが、5番目をputする段階でqueue capacityが上限に到達しているので、waitします。Consumerがtakeした後にwaitが解除され、Producerがput:5をしている様子を確認できます。(78msと通常よりも大幅に時間がかかっており、waitしていたことが分かる。)
*queueに実行時のqueueサイズを同時に取得する手段がないため、queue sizeはrace condition次第で実際とは異なる値が表示されます。
offer動作
queueサイズがcapacityである場合、Producerはoffer時にfalseを返却します。queueサイズがcapacity未満である場合、直ちに挿入しtrueを返却します。
なお、offer(E e, long timeout, TimeUnit unit)も存在し、こちらは一定時間のwaitを許容します。設定したtimeout時間を超過した場合、falseが返却されます。
コード例
Producerのみ変更しています。
public class Producer extends Thread{ private final BlockingQueue<Data> queue; private int number = 0; public Producer(BlockingQueue<Data> queue) { this.queue = queue; } @Override public void run() { long start = System.nanoTime() / 1000; boolean result = queue.offer(new Data(number)); long end = System.nanoTime() / 1000; String str = result ? "put:" + number + ",queue size:" + queue.remainingCapacity() : "failed to offer:" + number; System.out.println("put process time[ms]:" + (end - start) + "," + str); number++; } }
実行結果
offer process time[ms]:319,offer:0,queue size:2 take process time[ms]:1,take:Data{number=0},queue size:3 offer process time[ms]:13,offer:1,queue size:2 offer process time[ms]:9,offer:2,queue size:1 take process time[ms]:0,take:Data{number=1},queue size:2 offer process time[ms]:6,offer:3,queue size:1 offer process time[ms]:6,offer:4,queue size:0 offer process time[ms]:3,failed to offer:5 take process time[ms]:0,take:Data{number=2},queue size:1 offer process time[ms]:15,offer:6,queue size:0
5をputしようとした際にfailedの文字が出ていることが分かります。
offerの即時実行性
なお、offer時の即時実行性については以下のようになります。
offer process time[ms]:3,offer:966,queue size:0 offer process time[ms]:1,failed to offer:967 offer process time[ms]:0,failed to offer:968 offer process time[ms]:3,offer:969,queue size:0 offer process time[ms]:1,failed to offer:970 offer process time[ms]:25,offer:971,queue size:0 offer process time[ms]:1,failed to offer:972 offer process time[ms]:0,failed to offer:973 offer process time[ms]:3,offer:974,queue size:0 offer process time[ms]:1,failed to offer:975
971の場合ですが、offer時に他と比較して処理時間が大きくなっています。
1.8におけるLinkedBlockingQueueの実装は以下のようになっています
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() == capacity) return false; enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
したがって、capacityがfullの場合は即座にofferはfalseを返却しますが、それ以降はputLockを必要とします。したがって、putLockが他に取得されている場合には、“即座に”処理が終了する訳ではありません。
putLockは例えばclear, remove, toArray等で使用されています。これらの処理は長時間の処理を必要とする操作ではありませんが、それでもQueueの処理量によっては致命的になることもあるので、即時返答性を期待して使用する際には注意が必要です。