BlockingQueue usage (producer consumer)
The Java BlockingQueue interface in the java.util.concurrent package represents a thread safe queue that can put and get instances.
In this article, I will show you how to use this BlockingQueue.
This article will not discuss how to implement BlockingQueue in Java. If you are interested in this, in my partial theory Java concurrency tutorial There is an article on blocking queues in.
BlockingQueue use
The production thread will continue to produce new objects and insert them into the queue until the queue reaches the maximum it can contain. In other words, this is the limit. If the blocking queue reaches its upper limit, it blocks the production thread when trying to insert a new object. The consuming thread is blocked until it takes the object out of the queue.
The consuming thread continuously takes the object out of the blocking queue and processes it. If a consumer thread attempts to take an object out of an empty queue, the consumer thread will be blocked until the generated thread puts the object in the queue.
BlockingQueue method
BlockingQueue has four different methods to insert, delete, and check elements in the queue. The behavior of each set of methods is different in case the requested operation cannot be performed immediately. Here is a table of these methods:
Throws Exception | Special value | block | overtime | |
---|---|---|---|---|
Insert | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
Remove | remove(o) | poll() | take() | poll(timeout, timeunit) |
Examine | element() | peek() |
These four different ways of behavior mean
- Throws Exception: throws an exception if the attempted operation cannot occur immediately.
- Special value: if the attempted operation cannot be performed immediately, a special value (usually true / false) is returned.
- Blocking: if the attempted operation cannot be performed immediately, the method will block.
- Timeout: if the attempted operation cannot be performed immediately, the method call will block, but will not exceed the given timeout.
Returns a special value that tells whether the operation was successful (usually true / false).
Cannot insert null into BlockingQueue. If you try to insert null, BlockingQueue throws a NullPointerException.
You can also access all elements in the BlockingQueue, not just the beginning and end elements. For example, suppose you need to process an object in the queue, but your application decides not to process it. You can then call remove (o) to delete a specific object in the queue. However, this is not very effective, so you should not use these Collection methods unless you really need them
BlockingQueue implementation
Since BlockingQueue is an interface, you need to use one of its implementations to use it. The java.util.concurrent package has the following implementations of the BlockingQueue interface (in Java 6):
BlockingQueue example
This is an example of a Java BlockingQueue. This example uses the ArrayBlockingQueue implementation of the BlockingQueue interface.
First, the BlockingQueueExample class starts producers and consumers in different threads. The producer inserts strings into the shared BlockingQueue and the consumer takes them out.
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueExample { public static void main(String[] args) throws Exception { BlockingQueue queue = new ArrayBlockingQueue(1024); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); new Thread(producer).start(); new Thread(consumer).start(); Thread.sleep(4000); } }
This is the producer class.
Notice its use of sleep between each put () call. This will cause the consumer to block while waiting for objects in the queue.
import java.util.concurrent.BlockingQueue; public class Producer implements Runnable{ protected BlockingQueue queue = null; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { try { queue.put("1"); //Insert content into queue Thread.sleep(1000); queue.put("2"); Thread.sleep(1000); queue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } } }
This is the consumer category.
It simply takes objects from the queue and prints them to System.out.
import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable{ protected BlockingQueue queue = null; public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { try { System.out.println(queue.take()); System.out.println(queue.take()); //If the producer is stored, it can be taken out of the queue System.out.println(queue.take()); //If the Producer's blocking queue is empty, re fetching will cause blocking } catch (InterruptedException e) { e.printStackTrace(); } } }