Evolution of Producer and Consumer Problems in Java

Links to the original text: https://dzone.com/articles/th...

Authors: Ioan Tinca

Translator: liumapp

Want to know more about the evolution of Java producer-consumer issues? Let's take a look at this article. We'll deal with this problem with the old method and the new method respectively.

The producer-consumer problem is a typical multi-process synchronization problem.

For most people, this problem may be the first synchronization problem we encounter in school when we implement the first parallel algorithm.

Although it is simple, it has always been the biggest challenge in parallel computing - multiple processes sharing one resource.

Question Statement

Producer and consumer programs share a public buffer of limited size.

Suppose a producer "produces" a piece of data and stores it in a buffer, while a consumer "consumes" the data and deletes the data from the buffer.

Assuming these two programs are running concurrently now, we need to make sure that when the data in the buffer is full, the producer will not put new data in, and that when the data in the buffer is empty, the consumer will not attempt to delete the data in the data buffer.

Solution

In order to solve the above concurrency problems, producers and consumers will have to communicate with each other.

If the buffer is full, the producer will sleep until a notification message awakens.

After the consumer deletes some data from the buffer, the consumer notifies the producer, and then the producer starts filling the buffer again.

If the buffer is empty, the situation is the same, except that consumers wait for the producer's notification first.

But if this communication is not done properly, the position where processes wait for each other may lead to program deadlock.

Classical methods

First, let's look at a typical Java solution to solve this problem.

package ProducerConsumer;
import java.util.LinkedList;
import java.util.Queue;
public class ClassicProducerConsumerExample {
 public static void main(String[] args) throws InterruptedException { Buffer buffer = new Buffer(2); Thread producerThread = new Thread(new Runnable() { @Override public void run() { try { buffer.produce(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread consumerThread = new Thread(new Runnable() { @Override public void run() { try { buffer.consume(); } catch (InterruptedException e) { e.printStackTrace(); } } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); } static class Buffer { private Queue list; private int size; public Buffer(int size) { this.list = new LinkedList<>(); this.size = size; } public void produce() throws InterruptedException { int value = 0; while (true) { synchronized (this) { while (list.size() >= size) { // wait for the consumer wait(); } list.add(value); System.out.println("Produced " + value); value++; // notify the consumer notify(); Thread.sleep(1000); } } } public void consume() throws InterruptedException { while (true) { synchronized (this) { while (list.size() == 0) { // wait for the producer wait(); } int value = list.poll(); System.out.println("Consume " + value); // notify the producer notify(); Thread.sleep(1000); } } } }}

Here we have two threads, producer and consumer, which share a common buffer. Producer threads begin to generate new elements and store them in buffers. If the buffer is full, the producer thread goes to sleep until a notification wakes up. Otherwise, the producer thread will create a new element in the buffer and notify the consumer. As I said before, this process also applies to consumers. If the buffer is empty, the consumer will wait for the producer's notification. Otherwise, the consumer will delete an element from the buffer and notify the producer.

As you can see, in the previous examples, the work of both producers and consumers is the object of buffer management. These threads do everything by calling only buffer.produce() and buffer.consume().

Whether a buffer should be responsible for creating or deleting elements has always been a controversial topic, but in my opinion, a buffer should not do this. Of course, it depends on what you want to achieve, but in this case, the buffer should only be responsible for storing merged elements in thread-safe form, rather than producing new elements.

So let's decouple the logic of production and consumption from the buffer object.

package ProducerConsumer;
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumerExample2 {
 public static void main(String[] args) throws InterruptedException { Buffer buffer = new Buffer(2); Thread producerThread = new Thread(() -> { try { int value = 0; while (true) { buffer.add(value); System.out.println("Produced " + value); value ++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { while (true) { int value = buffer.poll(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); } static class Buffer { private Queue list; private int size; public Buffer(int size) { this.list = new LinkedList<>(); this.size = size; } public void add(int value) throws InterruptedException { synchronized (this) { while (list.size() >= size) { wait(); } list.add(value); notify(); } } public int poll() throws InterruptedException { synchronized (this) { while (list.size() == 0) { wait(); } int value = list.poll(); notify(); return value; } } }}

This is much better, at least now the buffer is only responsible for storing and deleting elements in thread-safe form.

Blocking Queue

However, we can make further improvements.

In the previous example, we have created a buffer that waits for a slot to be available before storing an element to prevent insufficient storage space, and before merging, the buffer waits for a new element to appear to ensure that the operations of storage and deletion are threads. Safe.

However, Java's own libraries have integrated these operations. It's called Blocking Queue, in Here You can view its detailed documentation.

BlockingQueue is a thread-safe queue for storing and retrieving instances. And that's what we need.

So, if we use BlockingQueue in our example, we don't need to implement the wait and notification mechanism anymore.

Next, let's look at the specific code.

package ProducerConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class ProducerConsumerWithBlockingQueue {
 public static void main(String[] args) throws InterruptedException { BlockingQueue blockingQueue = new LinkedBlockingDeque<>(2); Thread producerThread = new Thread(() -> { try { int value = 0; while (true) { blockingQueue.put(value); System.out.println("Produced " + value); value++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { while (true) { int value = blockingQueue.take(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); }}

Although runnables seem to be the same as before, they produce and consume elements in the same way as before.

The only difference is that we use blockingQueue instead of buffer objects.

More details about Blocking Queue

There are many types here. BlockingQueue:

  • Unbounded queue
  • Bounded queue

An unbounded queue can add elements almost indefinitely, and any addition operations will not be blocked.

You can create an unbounded queue in this way:

BlockingQueue blockingQueue = new LinkedBlockingDeque<>();

In this case, since the addition operation will not be blocked, the producer can add new elements without waiting. Every time a producer wants to add a new element, a queue stores it first. However, there is also an exception that needs to be caught. If consumers delete elements more slowly than producers add new elements, then memory will be filled up, and we may get an OutOfMemory exception.

On the contrary, there is a bounded queue with a fixed size. You can create it this way:

BlockingQueue blockingQueue = new LinkedBlockingDeque<>(10);

The main difference between the two is that in the case of bounded queues, if the queue memory is full and the producer is still trying to plug elements into it, the queue will be blocked (depending on the method of adding elements) until there is enough space to free up.

There are four ways to add elements to blocking queue:

  • add() - If the insert returns true successfully, otherwise throw IllegalStateException
  • put() - insert elements into the queue and wait for an available slot if necessary
  • offer() - If the insertion element returns true successfully, otherwise false is returned
  • Offer (E, long timeout, TimeUnit unit) - Insert an element into the queue when the queue is not full, or after waiting for a specified time for an available slot.

So, if you use put() to insert elements and the queue memory is full, our producer has to wait until available slot s appear.

That's all about our last case, which is the same as Producer Consumer Example2.

Using thread pools

What else can we optimize? Let's start with an analysis of what we did. We instantiated two threads, one called the producer, which stuffed elements into the queue, and the other called the consumer, which was responsible for deleting elements from the queue.

However, good software technology shows that it is not good to create and destroy threads manually. Creating threads first is an expensive task. Every time a thread is created, it means going through the following steps:

  • First, you allocate memory to a thread stack
  • The operating system creates a native thread that corresponds to Java
  • Descriptors associated with this thread are added to the data structure within the JVM

First of all, don't get me wrong. There's no problem with several threads in our case, and that's one of the ways to work concurrently. The problem here is that we create threads manually, which can be said to be a bad practice. If we create threads manually, in addition to the consumption during the creation process, there is another problem that we cannot control how many threads are running at the same time. For example, if there are one million requests for online services at the same time, then each request will create a thread accordingly, and at the same time there will be one million threads running in the background, which will result in thread starvation)

So we need a way to manage threads globally, which uses thread pools.

Thread pools will handle the lifecycle of threads based on the strategy we choose. It has a limited number of idle threads and enables them when tasks need to be resolved. In this way, we don't need to create a new thread for every new request, so we can avoid thread hunger.

The implementation of Java thread pool includes:

  • A task queue
  • A collection of worker threads
  • A Thread Factory
  • Metadata for managing thread pool status

In order to run some tasks at the same time, you have to put them in the task queue first. Then, when a thread is available, it receives a task and runs it. The more threads available, the more tasks are performed in parallel.

In addition to managing the thread life cycle, there is another advantage to using thread pools. When you plan how to split tasks so that they can be executed simultaneously, you can think of more ways. The unit of parallelism is no longer threads, but tasks. Instead of having threads run concurrently by sharing common memory blocks, you design tasks to execute concurrently. Thinking in terms of functional requirements can help us avoid some common multithreading problems, such as deadlocks or data competition. Nothing prevents us from delving into these issues again, but because of the functional paradigm, we cannot imperatively synchronize parallel computing (locks). This is much less likely than using threads and shared memory directly. In our example, sharing a blocking queue is not what you want, but I just want to emphasize this advantage.

stay Here and Here You can find more about thread pools.

So much, let's look at how thread pools are used in the case.

package ProducerConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class ProducerConsumerExecutorService {
 public static void main(String[] args) { BlockingQueue blockingQueue = new LinkedBlockingDeque<>(2); ExecutorService executor = Executors.newFixedThreadPool(2); Runnable producerTask = () -> { try { int value = 0; while (true) { blockingQueue.put(value); System.out.println("Produced " + value); value++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }; Runnable consumerTask = () -> { try { while (true) { int value = blockingQueue.take(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }; executor.execute(producerTask); executor.execute(consumerTask); executor.shutdown(); }}

The difference here is that we don't create or run consumer and producer threads manually. We build a thread pool, which will receive two tasks, producer and consumer. The task of producer and consumer is actually the same as that of runnable used in the previous example. Now, the executor (thread pool implementation) will receive tasks and arrange its worker threads to execute them.

In our simple case, everything works as before. As in the previous example, we still have two threads that still produce and consume elements in the same way. Although we haven't improved performance, the code looks much cleaner. Instead of creating threads manually, we just specify what we want: we want to perform certain tasks concurrently.

So when you use a thread pool. You don't need to consider that threads are units of concurrent execution. Instead, you treat some tasks as concurrent execution. That's what you need to know. The rest is handled by the execution program. The executor receives some tasks, and then it assigns worker threads to process them.

summary

First, we see a solution to the "traditional" consumer-producer problem. We try to avoid duplicating unnecessary wheels. Instead, we reuse the solutions we have tested. So instead of writing a notification waiting system, we try to use the blocking queue that Java already provides, because Java provides us with a very efficient thread pool to manage thread generation. Lifecycle, let's get rid of creating threads manually. Through these improvements, the solutions to consumer-producer problems seem more reliable and better understood.

Keywords: Java jvm less REST

Added by tmk4php on Mon, 05 Aug 2019 11:12:16 +0300