Producer and consumer model of Java multithreading

1 producers and consumers in multithreading

Using producer and consumer patterns in concurrent programming can solve most concurrent problems. This mode improves the overall data processing speed of the program by balancing the working capacity of production thread and consumption thread.

1.1 why use the producer and consumer model

In the thread world, the producer is the thread of production data, and the consumer is the thread of consumption data. In multi-threaded development, if the processing speed of the producer is very fast and the processing speed of the consumer is very slow, the producer must wait for the consumer to finish processing before continuing to produce data. Similarly, if the processing capacity of consumers is greater than that of producers, consumers must wait for producers. In order to solve the problem of unbalanced production and consumption capacity, there is a producer and consumer model.

1.2 what is the producer consumer model

Producer consumer model is to solve the strong coupling problem between producer and consumer through a container. Producers and consumers do not communicate directly with each other, but communicate through the blocking queue. Therefore, after producing data, producers do not need to wait for consumers to process, but directly throw it to the blocking queue. Consumers do not ask producers for data, but directly take it from the blocking queue. The blocking queue is equivalent to a buffer, which balances the processing capacity of producers and consumers.
This blocking queue is used to decouple producers and consumers. Throughout most design patterns, a third party will be found for decoupling. For example, the third party of factory pattern is factory class and the third party of template pattern is template class. In the process of learning some design patterns, if we find the third party of this pattern first, it can help us get familiar with a design pattern quickly.

1.3 actual operation

For multithreaded programs, regardless of any programming language, the producer and consumer model is the most classic.
In fact, it should be the producer consumer storage model. Without storage, the producer consumer model is not convincing.
For this model, the following points should be clarified:

  1. Producers only produce when the warehouse is not full, and stop production when the warehouse is full.
  2. Consumers can only consume when there are products in the warehouse, and wait when the warehouse is empty.
  3. When consumers find that there is no product to consume in the warehouse, they will notify the producer for production.
  4. When producing consumable products, producers should inform waiting consumers to consume.

This model will be combined with Java Lang. object's wait, notify and notifyAll methods are used to meet the above requirements. This is very important.

/** 
* Java Threads: concurrent collaboration producer consumer model 
*/ 
public class Test { 
        public static void main(String[] args) { 
                Godown godown = new Godown(30); 
                Consumer c1 = new Consumer(50, godown); 
                Consumer c2 = new Consumer(20, godown); 
                Consumer c3 = new Consumer(30, godown); 
                Producer p1 = new Producer(10, godown); 
                Producer p2 = new Producer(10, godown); 
                Producer p3 = new Producer(10, godown); 
                Producer p4 = new Producer(10, godown); 
                Producer p5 = new Producer(10, godown); 
                Producer p6 = new Producer(10, godown); 
                Producer p7 = new Producer(80, godown); 

                c1.start(); 
                c2.start(); 
                c3.start(); 
                p1.start(); 
                p2.start(); 
                p3.start(); 
                p4.start(); 
                p5.start(); 
                p6.start(); 
                p7.start(); 
        } 
} 

/** 
* Warehouse 
*/ 
class Godown { 
        public static final int max_size = 100; //Maximum inventory 
        public int curnum;     //Current inventory 

        Godown() { 
        } 

        Godown(int curnum) { 
                this.curnum = curnum; 
        } 

        /** 
         * Produce a specified number of products 
         * 
         * @param neednum 
         */ 
        public synchronized void produce(int neednum) { 
                //Test whether production is required 
                while (neednum + curnum > max_size) { 
                        System.out.println("Quantity of products to be produced" + neednum + "Excess of remaining inventory" + (max_size - curnum) + ",The production task cannot be executed temporarily!"); 
                        try { 
                                //Current production thread waiting 
                                wait(); 
                        } catch (InterruptedException e) { 
                                e.printStackTrace(); 
                        } 
                } 
                //If the production conditions are met, the production is carried out. Here, the current inventory is simply changed 
                curnum += neednum; 
                System.out.println("It has been produced" + neednum + "Products with current warehouse reserves of" + curnum); 
                //Wake up all threads waiting on this object monitor 
                notifyAll(); 
        } 

        /** 
         * Consume a specified number of products 
         * 
         * @param neednum 
         */ 
        public synchronized void consume(int neednum) { 
                //Test whether it can be consumed 
                while (curnum < neednum) { 
                        try { 
                                //Current production thread waiting 
                                wait(); 
                        } catch (InterruptedException e) { 
                                e.printStackTrace(); 
                        } 
                } 
                //If the consumption conditions are met, consumption will be carried out. Here, simply change the current inventory 
                curnum -= neednum; 
                System.out.println("Already consumed" + neednum + "Products with current warehouse reserves of" + curnum); 
                //Wake up all threads waiting on this object monitor 
                notifyAll(); 
        } 
} 

/** 
* producer 
*/ 
class Producer extends Thread { 
        private int neednum;                //Quantity of products produced 
        private Godown godown;            //Warehouse 

        Producer(int neednum, Godown godown) { 
                this.neednum = neednum; 
                this.godown = godown; 
        } 

        public void run() { 
                //Produce a specified number of products 
                godown.produce(neednum); 
        } 
} 

/** 
* consumer 
*/ 
class Consumer extends Thread { 
        private int neednum;                //Quantity of products produced 
        private Godown godown;            //Warehouse 

        Consumer(int neednum, Godown godown) { 
                this.neednum = neednum; 
                this.godown = godown; 
        } 

        public void run() { 
                //Consume a specified number of products 
                godown.consume(neednum); 
        } 
}
 
10 products have been produced, and the current warehouse reserve is 40 
10 products have been produced, and the current warehouse reserve is 50 
50 products have been consumed, and the current warehouse reserve is 0 
80 products have been produced, and the current warehouse reserve is 80 
30 products have been consumed, and the current warehouse reserve is 50 
10 products have been produced, and the current warehouse reserve is 60 
20 products have been consumed, and the current warehouse reserve is 40 
10 products have been produced, and the current warehouse reserve is 50 
10 products have been produced, and the current warehouse reserve is 60 
10 products have been produced, and the current warehouse reserve is 70 

explain:
For this example, when it is found that the production or consumption conditions cannot be met, the wait method of the object is called. The function of the wait method is to release the lock obtained by the current thread and call the notifyAll() method of the object, Notify (wake up) other waiting threads on the object to continue execution. In this way, the whole producer and consumer threads can execute correctly.
The notifyAll() method serves as a notification. It does not release the lock or acquire the lock. Just tell the waiting threads on the object that they can compete for execution. Wake up and execute.

This example is only the simplest representation in the producer consumer model. In this example, if the storage volume consumed by consumers cannot be met and there is no producer, the program will always be in a waiting state, which is of course wrong. In fact, this example can be modified to drive production according to consumption and take into account the warehouse at the same time. If the warehouse is not satisfied, it will produce and limit the maximum consumption each time, so there is no such problem. Of course, such an example is more complex and it is more difficult to explain such a simple model.

Added by fitzromeo on Mon, 03 Jan 2022 19:57:37 +0200