Java multithreading basic producers and consumers

Java multithreading Foundation (12) producers and consumers

1, Producer and consumer model

The problem of producer and consumer is a typical multithreading problem, which involves "producer", "consumer", "warehouse" and "product". Their relationship is as follows:

① The producer only produces when the warehouse is not full, and stops production when the warehouse is full.
② . consumers can only consume when there are products in the warehouse, and wait when the warehouse is empty.
③ When the consumer finds that there is no product available for consumption in the warehouse, the consumer will inform the producer to produce.
④ When producing consumable products, the producer shall inform the waiting consumers to consume.


Specifically speaking, the producer consumer model means that in a system, there are two roles: producer and consumer. They communicate through the memory buffer. The producer produces the data needed by the consumer, and the consumer makes the data into products. The production consumer model is shown in the figure below.

In the growing service types, such as registered users, it may be decoupled into several independent services (account verification, email verification code, SMS code, etc.). As consumers, they wait for users to input data. After the front-end data is submitted, it will be decomposed and sent to the url where each service is located. The role of distribution is equivalent to that of producer. Consumers may not be able to process the data at one time, so they each have a request queue, which is the memory buffer. The framework for doing this is called message queuing.

2, Producer and consumer realization

The model is implemented by the following example of production package and wait()/notify() (after learning about thread pool, the production / consumer model is implemented by other methods).

Bread:

public class Bread {
    private int capacity;    // The capacity of bread
    private int size;        // Actual quantity of bread
    public Bread(int capacity) {
        this.capacity = capacity;
        this.size = 0;
    }
    
    // Bread production
    public synchronized void produce(int val) {
        try {
             // left Indicates "quantity to be produced"(It is possible that there is too much production, so more production is needed)
            int left = val;
            while (left > 0) {
                // When the inventory is full, wait for the consumer to consume the product.
                while (size >= capacity)
                    wait();
                // Get "quantity actually produced"(That is, the newly added quantity in the inventory)
                // If "inventory"+"Quantity to be produced ">"Total capacity ", then" actual increment "="Total capacity "-"Current capacity ".(Now fill the warehouse)
                // Otherwise "actual increment"="Quantity to be produced "
                int inc = (size+left)>capacity ? (capacity-size) : left;
                size += inc;
                left -= inc;
                System.out.printf("%s produce(%3d) --> left=%3d, inc=%3d, size=%3d\n",
                        Thread.currentThread().getName(), val, left, inc, size);
                // Inform "consumers" that they can consume.
                notifyAll();
            }
        }catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
    // Consumer bread
    public synchronized void consume(int val) {
        try {
             // left "The number of customers to consume"(It is possible that the consumption is too large and the inventory is not enough, so more consumption is needed)
            int left = val;
            while (left > 0) {
                // When the inventory is 0, wait for the "producer" to produce the product.
                while (size <= 0)
                    wait();
                // Obtain "actual consumption quantity"(That is, the quantity actually reduced in the inventory)
                // If "inventory"<""Actual consumption" refers to the number of customers to consume="Inventory ";
                // Otherwise, "actual consumption"="Number of customers to consume ".
                int dec = (size<left) ? size : left;
                size -= dec;
                left -= dec;
                System.out.printf("%s consume(%3d) <-- left=%3d, dec=%3d, size=%3d\n",
                        Thread.currentThread().getName(), val, left, dec, size);
                notifyAll();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Producer class

public class Producer{
    Bread bread;
    public Producer(Bread bread) {
        this.bread = bread;
    }
    public void produce(final int val) {
        new Thread(() -> {
            bread.produce(val);
        }).start();;
    }
}

Consumer

public class Customer {
    private Bread bread;
    public Customer(Bread bread) {
        this.bread = bread;
    }
    public void consume(final int val) {
        new Thread(() -> {
            bread.consume(val);
        }).start();;
    }
}

Test class code

public class Demo {
    public static void main(String[] args) {
        Bread bread = new Bread(100);
        Producer producer = new Producer(bread);
        Cunstomer customer = new Customer(bread);
        
        producer.produce(60);
        producer.produce(120);
        consumer.consume(90);
        consumer.consume(150);
        producer.produce(110);
    }
}
// Operation results
Thread-1 produce( 60) --> left=  0, inc= 60, size= 60
Thread-5 produce(110) --> left= 70, inc= 40, size=100
Thread-4 consume(150) <-- left= 50, dec=100, size=  0
Thread-2 produce(120) --> left= 20, inc=100, size=100
Thread-3 consume( 90) <-- left=  0, dec= 90, size= 10
Thread-4 consume(150) <-- left= 40, dec= 10, size=  0
Thread-5 produce(110) --> left=  0, inc= 70, size= 70
Thread-4 consume(150) <-- left=  0, dec= 40, size= 30
Thread-2 produce(120) --> left=  0, inc= 20, size= 50

explain:

① Producer is the producer class, which is associated with bread. When the producer's produce() method is called, it creates a new thread and produces the product to the bread class.
② , Customer is the "consumer" class, which is associated with "bread". When the consumer () method is called, it creates a new thread and consumes the products in the bread class.
③ Bread is the bread category, which records the "capacity" of bread and the current actual number of bread.
The production () and consumption () methods of bread class are synchronized methods. Entering the synchronized method body means that this thread obtains the synchronization lock of the "bread" object. This means that at the same time, only one of the producer and consumer threads can run. Through the synchronous lock, the "cruel" mutual exclusive access is realized.
For the production method produce(): when the bread is full, the producer thread waits. It needs to wait for the consumer to consume the product before the production thread can produce it. After the producer thread produces the bread, it will wake up all the threads on the synchronization lock through notifyAll(), including "consumer thread", which is what we call "notify the consumer to consume".
For the consumption method consumption (): when the warehouse is empty, the consumer thread waits. It needs to wait for the producer to produce the product before the consumer thread can consume. After the consumer thread consumes the product, it will wake up all threads on the synchronization lock through notifyAll(), including "producer thread", which is what we call "notify the producer to produce".




Keywords: Java

Added by robburne on Fri, 26 Jun 2020 09:26:30 +0300