Producer consumer model and two solutions

Thread collaboration

Thread communication

Application scenario: producer and consumer issues

  • Suppose that only one product can be stored in the warehouse, the producer puts the produced products into the warehouse, and the consumer takes the products from the warehouse for consumption.
  • If there is no product in the warehouse, the producer will put the product into the warehouse, otherwise stop production and wait until the product in the warehouse is taken away by the consumer.
  • If there is a product in the warehouse, the consumer can take the product away for consumption, otherwise stop consumption and wait until the product is put in the warehouse again.

Thread communication - Analysis

This is a problem of thread synchronization. Producers and consumers share the same resource, and producers and consumers are interdependent and conditional on each other.

  • For producers, they should inform consumers to wait before producing products, and after producing products, they need to inform consumers to consume immediately.
  • For consumers, after consumption, they should inform producers that they have finished consumption and need to produce new products for consumption.
  • In the producer consumer problem, synchronized is not enough.
    • synchronized can prevent concurrent updates of the same shared resource and realize synchronization.
    • synchronized cannot be used for message passing (Communication) between different threads

Thread communication

Java provides several methods to solve the communication problem between threads

wait(): indicates that the thread waits until other threads notify it. Unlike sleep, the lock will be released.

wait(long timeout): Specifies the number of milliseconds to wait

notify(): wakes up a waiting thread.

notifyAll(): wakes up all threads calling the wait() method on the same object. Threads with higher priority are scheduled first

Note: all methods in the Object class can only be used in synchronization methods or synchronization code blocks, otherwise an exception IIIegalMonitorStateException will be thrown

Solution 1

Concurrent collaboration model "producer / consumer model" - > management process method

  • Producer: the module responsible for production data (may be method, object, thread, process)
  • Consumer: module responsible for processing data (may be method, object, thread, process)
  • Buffer: consumers cannot directly use the producer's data. There is a buffer between them

The producer puts the produced data into the buffer, and the consumer takes out the data from the buffer

Code implementation (customers go to KFC to buy chicken):

//Test: producer consumer model -- > using buffer solution: pipe process method

//Producer, consumer, product, buffer
public class TestPC {

    public static void main(String[] args) {
        SynContainer container = new SynContainer();

        Productor productor = new Productor(container);
        Consumer consumer = new Consumer(container);

        productor.start();
        consumer.start();
    }
}

//producer
class Productor extends Thread{
    SynContainer container;

    public Productor(SynContainer container){
        this.container = container;
    }

    //production

    @Override
    public void run() {
        for (int i = 1; i <= 100; i++) {
            container.push(new Chicken(i));
            System.out.println("Produced" + (i) +"Chicken");
        }
    }
}

//consumer
class Consumer extends Thread{
    SynContainer container;

    public Consumer(SynContainer container){
        this.container = container;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println("Consumption" + container.pop().id + "Chicken");
        }
    }
}

//product
class Chicken{
    int id; //Product number

    public Chicken(int id) {
        this.id = id;
    }
}

//buffer
class SynContainer{

    //A container is required
    Chicken[] chickens = new Chicken[10];
    //Container counter
    int count = 0;

    //The producer puts in the product
    public synchronized void push(Chicken chicken){
        //If the container is full, it needs to wait for consumers to consume
        if (count == chickens.length) {
            //Inform consumers to consume and producers to wait
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //If it is not full, we need to throw in the product
        chickens[count++] = chicken;

        //Consumers can be informed of consumption
        this.notifyAll();
    }

Solution 2

Concurrent collaboration model "producer / consumer model" - > semaphore method

Code implementation (actor performance, audience viewing):

//Test producer consumer problem 2: signal lamp method, flag bit solution
public class TestPC2 {

    public static void main(String[] args) {
        TV tv = new TV();

        Player player = new Player(tv);
        Watcher watcher = new Watcher(tv);

        player.start();
        watcher.start();
    }
}

//producer
class Player extends Thread{

    TV tv = new TV();

    public Player(TV tv){
        this.tv = tv;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            if (i % 2 == 0) {
                tv.play("Happy camp playing");
            } else {
                tv.play("advertisement");
            }
        }
    }
}

//consumer
class Watcher extends Thread{
    TV tv = new TV();

    public Watcher(TV tv){
        this.tv = tv;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            tv.watch();
        }
    }
}

//Products -- > Programs
class TV{
    //The actor performs and the audience waits for T
    //The audience watched and the actors waited for F
    String voice; //A performance
    boolean flag = true;

    //perform
    public synchronized void play(String voice){

        if (!flag){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("The actors performed:" + voice);
        //Inform the audience to watch
        this.notifyAll();
        this.voice = voice;

        this.flag = !this.flag;
    }

    public synchronized void watch(){

        if (flag){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("The audience watched:" + this.voice);

        //Inform the actors to perform
        this.notifyAll();
        this.flag = !this.flag;
    }
}

Added by Assim on Thu, 23 Dec 2021 14:48:09 +0200