BlockingQueue
producer – consumer , publish – subscribeパターンの実装を用意するにするためのjava. util.concurrentパッケージのinterfaceです。
実態としては以下が存在し、すべてスレッドセーフで実装されています。
| ArrayBlockingQueue | 配列ベースのblocking queue. 性質はLinkedBlockingQueueと同様だが、一般的にLinkedBlockingQueueに対して性能は劣る。 |
| LinkedBlockingQueue | Queueのcapacity以内であればいつでも挿入可能。capacityを超えた場合は挿入を待機する。 |
| LinkedTransferQueue | Consumerが待機状態となるまで挿入を待機する。(queue成長を防止可能) |
| PriorityBlockingQueue | 指定した優先度に基づいてqueueから取得可能。 |
| LinkedBlokingDeque | dequeの性質(キューの先端・終端のどちらからでも挿入・取得が可能)を持ちます。 |
| DelayQueue | queueに挿入した要素は一定時間後に取り出し可能 |
最もオーソドックスなproducer – consumer パターンではLinkedBlockingQueueを使用します。
LinkedBlockingQueue
put動作
queueサイズがcapacityである場合、Producerはput時に待機します。queueサイズがcapacity未満になった場合(Consumerが取得した場合)、notifyされてputを実行します。
waitされたproducerが複数いた場合、最も長くwaitしたproducerから再開されることが保証されています。
コード例
Main
public class Main {
private static ScheduledExecutorService cProducerService = Executors.newSingleThreadScheduledExecutor();
private static ScheduledExecutorService cConsumerService = Executors.newSingleThreadScheduledExecutor();
public static void main(String[] args){
BlockingQueue<Data> queue = new LinkedBlockingQueue<>(3);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
cProducerService.scheduleWithFixedDelay(producer, 0, 100, TimeUnit.MILLISECONDS);
cConsumerService.scheduleWithFixedDelay(consumer, 0, 300, TimeUnit.MILLISECONDS);
}
}
Producer
public class Producer extends Thread{
private final BlockingQueue<Data> queue;
private int number = 0;
public Producer(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void run() {
long start = System.nanoTime() / 1000 / 1000;
try {
queue.put(new Data(number));
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.nanoTime() / 1000 / 1000;
System.out.println("put process time[ms]:" + (end - start) + "," + "put:" + number + ",queue size:" + queue.remainingCapacity());
number++;
}
}
Consumer
public class Consumer extends Thread{
private final BlockingQueue<Data> queue;
public Consumer(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
long start = System.nanoTime() / 1000 / 1000;
Data data = queue.take();
long end = System.nanoTime() / 1000 / 1000;
System.out.println("take process time[ms]:" + (end - start) + ",take:" + data + ",queue size:" + queue.remainingCapacity());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Data
public class Data {
private final int number;
public Data(int number) {
this.number = number;
}
public int getNumber() {
return number;
}
@Override
public String toString() {
return "Data{" +
"number=" + number +
'}';
}
}
実行結果
put process time[ms]:1,put:0,queue size:2
take process time[ms]:1,take:Data{number=0},queue size:3
put process time[ms]:0,put:1,queue size:2
put process time[ms]:1,put:2,queue size:1
take process time[ms]:0,take:Data{number=1},queue size:2
put process time[ms]:0,put:3,queue size:1
put process time[ms]:0,put:4,queue size:0
take process time[ms]:0,take:Data{number=2},queue size:1
put process time[ms]:78,put:5,queue size:0
take process time[ms]:0,take:Data{number=3},queue size:1
put process time[ms]:205,put:6,queue size:0
take process time[ms]:0,take:Data{number=4},queue size:1
put process time[ms]:204,put:7,queue size:0
put process time[ms]:204,put:8,queue size:0
capacityが3のqueueを作成し、Producer 100msec, Consuer 300msec周期でデータをput / takeします。
初めは順調にProducerはputしますが、5番目をputする段階でqueue capacityが上限に到達しているので、waitします。Consumerがtakeした後にwaitが解除され、Producerがput:5をしている様子を確認できます。(78msと通常よりも大幅に時間がかかっており、waitしていたことが分かる。)
*queueに実行時のqueueサイズを同時に取得する手段がないため、queue sizeはrace condition次第で実際とは異なる値が表示されます。
offer動作
queueサイズがcapacityである場合、Producerはoffer時にfalseを返却します。queueサイズがcapacity未満である場合、直ちに挿入しtrueを返却します。
なお、offer(E e, long timeout, TimeUnit unit)も存在し、こちらは一定時間のwaitを許容します。設定したtimeout時間を超過した場合、falseが返却されます。
コード例
Producerのみ変更しています。
public class Producer extends Thread{
private final BlockingQueue<Data> queue;
private int number = 0;
public Producer(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void run() {
long start = System.nanoTime() / 1000;
boolean result = queue.offer(new Data(number));
long end = System.nanoTime() / 1000;
String str = result ? "put:" + number + ",queue size:" + queue.remainingCapacity() : "failed to offer:" + number;
System.out.println("put process time[ms]:" + (end - start) + "," + str);
number++;
}
}
実行結果
offer process time[ms]:319,offer:0,queue size:2
take process time[ms]:1,take:Data{number=0},queue size:3
offer process time[ms]:13,offer:1,queue size:2
offer process time[ms]:9,offer:2,queue size:1
take process time[ms]:0,take:Data{number=1},queue size:2
offer process time[ms]:6,offer:3,queue size:1
offer process time[ms]:6,offer:4,queue size:0
offer process time[ms]:3,failed to offer:5
take process time[ms]:0,take:Data{number=2},queue size:1
offer process time[ms]:15,offer:6,queue size:0
5をputしようとした際にfailedの文字が出ていることが分かります。
offerの即時実行性
なお、offer時の即時実行性については以下のようになります。
offer process time[ms]:3,offer:966,queue size:0 offer process time[ms]:1,failed to offer:967 offer process time[ms]:0,failed to offer:968 offer process time[ms]:3,offer:969,queue size:0 offer process time[ms]:1,failed to offer:970 offer process time[ms]:25,offer:971,queue size:0 offer process time[ms]:1,failed to offer:972 offer process time[ms]:0,failed to offer:973 offer process time[ms]:3,offer:974,queue size:0 offer process time[ms]:1,failed to offer:975
971の場合ですが、offer時に他と比較して処理時間が大きくなっています。
1.8におけるLinkedBlockingQueueの実装は以下のようになっています
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
final int c;
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() == capacity)
return false;
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
したがって、capacityがfullの場合は即座にofferはfalseを返却しますが、それ以降はputLockを必要とします。したがって、putLockが他に取得されている場合には、“即座に”処理が終了する訳ではありません。
putLockは例えばclear, remove, toArray等で使用されています。これらの処理は長時間の処理を必要とする操作ではありませんが、それでもQueueの処理量によっては致命的になることもあるので、即時返答性を期待して使用する際には注意が必要です。
