Source network, only for learning, if there is infringement, please contact delete.
Producer consumer model is a very classic model of multithreading concurrent cooperation. Understanding the producer consumer problem can deepen our understanding of concurrent programming.
The so-called producer consumer problem, in fact, mainly contains two types of threads, one is the producer thread used for production data, the other is the consumer thread used for consumption data.
In order to decouple the relationship between the producer and the consumer, the shared data area is usually used, just like a warehouse. The producer's production data is directly placed in the shared data area, and does not need to care about the consumer's behavior; the consumer only needs to get data from the shared data area, and does not need to care about the producer's behavior.
However, this shared data area should have the function of concurrent collaboration between threads:
- If the shared data area is full, the producer is blocked to continue to put the production data in;
- If the shared data area is empty, consumers will be blocked to continue consuming data;
There are three ways to realize producer consumer problem:
1. Use the message notification mechanism of wait/notify of Object;
2. Use the await / signal message notification mechanism of Lock's Condition;
3. Using BlockingQueue implementation.
This paper summarizes the three ways of implementation.
Message notification mechanism of wait/notify
1. Preparatory knowledge
In Java, the communication between threads can be realized by cooperating with the wait() method and notify() method or notifyAll() method of calling Object object Object. Calling the wait () method in the thread will block the current thread until the other thread invokes the notify () method or the notifyAll () method to notify the thread until the current thread returns from the wait () method and continues the following operation.
Wait this method is used to put the current thread into sleep until it is notified or interrupted. Before calling wait(), the thread must get the object monitor lock of the object, that is, it can only call the wait() method in the synchronization method or the synchronization block.
After calling the wait () method, the current thread releases the lock. If the thread does not acquire the lock when calling the wait() method, an IllegalMonitorStateException will be thrown, which is a RuntimeException. If the lock is acquired again, the current thread can successfully return from the wait () method.
Notify this method also needs to be invoked in synchronous or synchronous blocks, that is, before the call, the thread must also get the object level lock of the object. If notify() is not properly locked, it will also throw IllegalMonitorStateException.
In this method, any thread in waiting state is selected for notification, so that the thread calling wait() method moves from waiting queue to synchronous queue, waiting for the chance to get lock again, so that the thread calling wait() method can exit from wait() method.
After calling notify, the current thread will not release the lock immediately. It will not release the lock until the program exits the synchronization block.
The notifyAll method works the same way as the notify() method. The important difference is that notifyAll makes all the threads originally waiting on the object exit the waiting state, so that they all move from the waiting queue to the synchronization queue, waiting for the next chance to get the object monitor lock.
2. wait/notify message notifies potential problems
1) notify early notification
The omission of notify is easy to understand, that is, when threadA has not started to wait, threadab has already been notify. In this way, threadab notification has no response. When threadab exits the synchronized code block, threadA starts to wait again, and it will block waiting until it is interrupted by other threads.
For example, in the following example code, the problems caused by the early notification of notify are simulated:
public class EarlyNotify { private static String lockObject = ""; public static void main(String[] args) { WaitThread waitThread = new WaitThread(lockObject); NotifyThread notifyThread = new NotifyThread(lockObject); notifyThread.start(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } waitThread.start(); } static class WaitThread extends Thread { private String lock; public WaitThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { System.out.println(Thread.currentThread().getName() + " Enter code block"); System.out.println(Thread.currentThread().getName() + " start wait"); lock.wait(); System.out.println(Thread.currentThread().getName() + " end wait"); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class NotifyThread extends Thread { private String lock; public NotifyThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " Enter code block"); System.out.println(Thread.currentThread().getName() + " start notify"); lock.notify(); System.out.println(Thread.currentThread().getName() + " End start notify"); } } } }
In the example, two threads are enabled: WaitThread and NotifyThread. NotifyThread will start first and call the notify method first.
Then the WaitThread thread starts and calls the wait method. However, after notification, the wait method can no longer get the corresponding notification. Therefore, WaitThread will always block the wait method, which is the phenomenon of premature notification.
To solve this problem, we usually add a status flag to let waitthread determine whether the status has changed before calling the wait method. If the notification has already been sent, waitthread will not wait.
Correct the above code:
public class EarlyNotify { private static String lockObject = ""; private static boolean isWait = true; public static void main(String[] args) { WaitThread waitThread = new WaitThread(lockObject); NotifyThread notifyThread = new NotifyThread(lockObject); notifyThread.start(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } waitThread.start(); } static class WaitThread extends Thread { private String lock; public WaitThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { while (isWait) { System.out.println(Thread.currentThread().getName() + " Enter code block"); System.out.println(Thread.currentThread().getName() + " start wait"); lock.wait(); System.out.println(Thread.currentThread().getName() + " end wait"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } static class NotifyThread extends Thread { private String lock; public NotifyThread(String lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " Enter code block"); System.out.println(Thread.currentThread().getName() + " start notify"); lock.notifyAll(); isWait = false; System.out.println(Thread.currentThread().getName() + " End start notify"); } } } }
This code only adds a isWait state variable. When NotifyThread calls the notify method, it updates the state variable. Before calling the wait method in WaitThread, the state variable is first judged. In this example, after calling notify, the isWait of state variable is changed to false. Therefore, in WaitThread, while will not execute the WaitThread method after judging the isWait. It avoids the omission caused by the early notification of notify.
Summary:
When using the waiting / notification mechanism of a thread, a boolean variable value (or other conditions that can judge whether it is true or false) should be used. Before notify, the value of the boolean variable should be changed to allow the wait to return and exit the while loop (generally, a layer of while should be added around the wait method) Loop to prevent early notification), or when the notification is missed, it will not be blocked at the wait method. This ensures the correctness of the program.
2) Wait for the wait condition to change
If the thread receives the notification while waiting, but the waiting condition changes after that, and it does not judge the waiting condition again, it will also cause an error in the program.
Let's use an example to illustrate this situation
public class ConditionChange { private static List<String> lockObject = new ArrayList(); public static void main(String[] args) { Consumer consumer1 = new Consumer(lockObject); Consumer consumer2 = new Consumer(lockObject); Productor productor = new Productor(lockObject); consumer1.start(); consumer2.start(); productor.start(); } static class Consumer extends Thread { private List<String> lock; public Consumer(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { //if is used here, there will be a problem of program error caused by the change of wait condition if (lock.isEmpty()) { System.out.println(Thread.currentThread().getName() + " list Empty"); System.out.println(Thread.currentThread().getName() + " call wait method"); lock.wait(); System.out.println(Thread.currentThread().getName() + " wait Method end"); } String element = lock.remove(0); System.out.println(Thread.currentThread().getName() + " Take out the first element as:" + element); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Productor extends Thread { private List<String> lock; public Productor(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " Start adding elements"); lock.add(Thread.currentThread().getName()); lock.notifyAll(); } } } }
Exception will be reported:
Exception in thread "Thread-1" Thread-0 list Empty Thread-0 call wait method Thread-1 list Empty Thread-1 call wait method Thread-2 Start adding elements Thread-1 wait Method end java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
Abnormal cause analysis:
In this example, there are three threads, Consumer1,Consumer2 and producer. First, after Consumer1 calls the wait method, the thread is in the waiting state and releases the object lock. Therefore, Consumer2 can acquire the object lock and enter the synchronous generation block. When the wait method is executed, the object lock will also be released.
Therefore, the product can obtain the object lock, enter the synchronization code block, insert data into the list, and notify the Consumer1 and Consumer2 threads in WAITING state through the notifyAll method.
After consumer1 obtains the object lock, it exits from the wait method, removes an element to leave the List empty, finishes the method execution, exits the synchronization block, and releases the object lock. At this time, after Consumer2 obtains the object lock, it exits from the wait method and continues to execute. At this time, Consumer2 executes again lock.remove(0); an error will occur because the List is empty after deleting an element due to consumer1.
Solution:
Through the above analysis, it can be seen that the reason for the Consumer2 exception is that the thread did not judge the wait condition again after exiting from the wait method. Therefore, the wait condition has changed at this time. The solution is to judge the conditions after the wait exits.
public class ConditionChange { private static List<String> lockObject = new ArrayList(); public static void main(String[] args) { Consumer consumer1 = new Consumer(lockObject); Consumer consumer2 = new Consumer(lockObject); Productor productor = new Productor(lockObject); consumer1.start(); consumer2.start(); productor.start(); } static class Consumer extends Thread { private List<String> lock; public Consumer(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { try { //if is used here, there will be a problem of program error caused by the change of wait condition while (lock.isEmpty()) { System.out.println(Thread.currentThread().getName() + " list Empty"); System.out.println(Thread.currentThread().getName() + " call wait method"); lock.wait(); System.out.println(Thread.currentThread().getName() + " wait Method end"); } String element = lock.remove(0); System.out.println(Thread.currentThread().getName() + " Take out the first element as:" + element); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Productor extends Thread { private List<String> lock; public Productor(List lock) { this.lock = lock; } @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " Start adding elements"); lock.add(Thread.currentThread().getName()); lock.notifyAll(); } } } }
The above code and the previous code just change the if statement surrounding the wait to the while loop. In this way, when the list is empty, the thread will continue to wait and will not continue to execute the code to delete the elements in the list.
Summary:
When you use the thread's wait / notification mechanism, you usually call the wait() method in the while loop, so xuy uses a boolean variable (or other conditions that can be judged true or false), as in this article. list.isEmpty()), when the while loop condition is met, enter the while loop and execute the wait() method, which does not meet the while condition When the condition of the loop, jump out of the loop and execute the following code.
3. "Feign death" status
Phenomenon: in the case of multi consumers and multi producers, if the notify method is used, the situation of "feign death" may occur, that is, the same kind of thread is awakened.
Cause analysis: assume that currently multiple producer threads will call wait method to block waiting. When the producer thread gets object lock, notify the thread in waiting state with notify. If the awakened thread is still the producer thread, all the producer threads will be in waiting state.
Solution: replace the notify method with the notifyAll method. If lock is used, replace the signal method with the signalAll method.
summary
The message notification mechanism provided in Object should follow these conditions:
Always judge the condition in the while loop instead of the wait condition in the if statement; Use NotifyAll instead of notify.
The basic usage patterns are as follows:
// The standard idiom for calling the wait method in Java synchronized (sharedObject) { while (condition) { sharedObject.wait(); // (Releases lock, and reacquires on wakeup) } // do action based upon condition e.g. take or put into queue }
**wait/notifyAll realizes producer consumer**
Using wait/notifyAll, the producer and consumer codes are as follows:
public class ProductorConsumer { public static void main(String[] args) { LinkedList linkedList = new LinkedList(); ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(linkedList, 8)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(linkedList)); } } static class Productor implements Runnable { private List<Integer> list; private int maxLength; public Productor(List list, int maxLength) { this.list = list; this.maxLength = maxLength; } @Override public void run() { while (true) { synchronized (list) { try { while (list.size() == maxLength) { System.out.println("producer" + Thread.currentThread().getName() + " list To achieve maximum capacity wait"); list.wait(); System.out.println("producer" + Thread.currentThread().getName() + " sign out wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("producer" + Thread.currentThread().getName() + " production data " + i); list.add(i); list.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } static class Consumer implements Runnable { private List<Integer> list; public Consumer(List list) { this.list = list; } @Override public void run() { while (true) { synchronized (list) { try { while (list.isEmpty()) { System.out.println("consumer" + Thread.currentThread().getName() + " list Empty, proceed wait"); list.wait(); System.out.println("consumer" + Thread.currentThread().getName() + " sign out wait"); } Integer element = list.remove(0); System.out.println("consumer" + Thread.currentThread().getName() + " Consumption data:" + element); list.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
Output results:
Producer pool-1-thread-1 production data-232820990 Production data of producer pool-1-thread-1 1432164130 Production data of producer pool-1-thread-1 1057090222 Producer pool-1-thread-1 production data 1201395916 Production data of producer pool-1-thread-1 482766516 Producer pool-1-thread-1 list for maximum capacity, wait Consumer pool-1-thread-15 exit wait Consumer pool-1-thread-15 consumption data: 1237535349 Consumer pool-1-thread-15 consumption data: - 1617438932 Consumer pool-1-thread-15 consumption data: - 535396055 Consumer pool-1-thread-15 consumption data: - 232820990 Consumer pool-1-thread-15 consumption data: 1432164130 Consumer pool-1-thread-15 consumption data: 1057090222 Consumer pool-1-thread-15 consumption data: 1201395916 Consumer pool-1-thread-15 consumption data: 482766516 Consumer pool-1-thread-15 list is empty, wait Producer pool-1-thread-5 exits wait Producer pool-1-thread-5 production data 1442969724 Production data of producer pool-1-thread-5 1177554422 Production data of producer pool-1-thread-5-133137235 Production data of producer pool-1-thread-5 324882560 Production data of producer pool-1-thread-5 2065211573 Production data of producer pool-1-thread-5 253569900 Production data of producer pool-1-thread-5 571277922 Production data of producer pool-1-thread-5 1622323863 Producer pool-1-thread-5 list for maximum capacity, wait Consumer pool-1-thread-10 exits wait
Using await/signalAll of Condition in Lock to realize producer consumer
By referring to the wait and notify/notifyAll methods of Object, Condition also provides the same method:
1. For wait method
Void wait() throws interruptedexception: if the current thread enters the waiting state, if other threads call the condition's signal or signalAll method and the current thread gets Lock and returns from the wait method, if interrupted in the waiting state, the interrupted exception will be thrown;
long awaitNanos(long nanosTimeout): the current thread enters the waiting state until it is notified, interrupted or timed out;
Boolean wait (long time, time unit unit) throws interruptedexception
boolean awaitUntil(Date deadline) throws InterruptedException: the current thread enters the waiting state until it is notified, interrupted or at a certain time
2. For the notify method
void signal(): wakes up a thread waiting on condition, transfers the thread from the waiting queue to the synchronization queue, and returns from the waiting method if it can compete for Lock in the synchronization queue.
void signalAll(): the difference with 1 is that it can wake up all threads waiting on condition
That is to say, wait > wait, notify > signal.
If we use the message notification principle of condition in lock to realize producer consumer problem, the principle is the same as using wait/notifyAll. Direct code:
public class ProductorConsumer { private static ReentrantLock lock = new ReentrantLock(); private static Condition full = lock.newCondition(); private static Condition empty = lock.newCondition(); public static void main(String[] args) { LinkedList linkedList = new LinkedList(); ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(linkedList, 8, lock)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(linkedList, lock)); } } static class Productor implements Runnable { private List<Integer> list; private int maxLength; private Lock lock; public Productor(List list, int maxLength, Lock lock) { this.list = list; this.maxLength = maxLength; this.lock = lock; } @Override public void run() { while (true) { lock.lock(); try { while (list.size() == maxLength) { System.out.println("producer" + Thread.currentThread().getName() + " list To achieve maximum capacity wait"); full.await(); System.out.println("producer" + Thread.currentThread().getName() + " sign out wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("producer" + Thread.currentThread().getName() + " production data " + i); list.add(i); empty.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } static class Consumer implements Runnable { private List<Integer> list; private Lock lock; public Consumer(List list, Lock lock) { this.list = list; this.lock = lock; } @Override public void run() { while (true) { lock.lock(); try { while (list.isEmpty()) { System.out.println("consumer" + Thread.currentThread().getName() + " list Empty, proceed wait"); empty.await(); System.out.println("consumer" + Thread.currentThread().getName() + " sign out wait"); } Integer element = list.remove(0); System.out.println("consumer" + Thread.currentThread().getName() + " Consumption data:" + element); full.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } }
Output results:
Consumer pool-1-thread-9 consumption data: 1146627506 Consumer pool-1-thread-9 consumption data: 1508001019 Consumer pool-1-thread-9 consumption data: - 600080565 Consumer pool-1-thread-9 consumption data: - 1000305429 Consumer pool-1-thread-9 consumption data: - 1270658620 Consumer pool-1-thread-9 consumption data: 1961046169 Consumer pool-1-thread-9 consumption data: - 307680655 Consumer pool-1-thread-9 list is empty, wait Consumer pool-1-thread-13 exit wait Consumer pool-1-thread-13 list is empty, wait Consumer pool-1-thread-10 exits wait Producer pool-1-thread-5 exits wait Producer pool-1-thread-5 production data-892558288 Producer pool-1-thread-5 production data-1917220008 Producer pool-1-thread-5 production data 2146351766 Producer pool-1-thread-5 production data 452445380 Production data of producer pool-1-thread-5 1695168334 Producer pool-1-thread-5 production data 1979746693 Producer pool-1-thread-5 production data-1905436249 Production data of producer pool-1-thread-5-101410137 Producer pool-1-thread-5 list for maximum capacity, wait Producer pool-1-thread-1 exits wait Producer pool-1-thread-1 list for maximum capacity, wait Producer pool-1-thread-4 exits wait Producer pool-1-thread-4 list for maximum capacity, wait Producer pool-1-thread-2 exits wait Producer pool-1-thread-2 list for maximum capacity, wait Producer pool-1-thread-3 exits wait Producer pool-1-thread-3 list for maximum capacity, wait Consumer pool-1-thread-9 exits wait Consumer pool-1-thread-9 consumption data: - 892558288
Using BlockingQueue to implement producer consumer
Due to the internal implementation of BlockingQueue, two blocking operations are attached.
That is, when the queue is full, the thread inserting data into the queue is blocked until the queue is not full; when the queue is empty, the thread obtaining data from the queue is blocked until the queue is not empty.
We can use BlockingQueue to realize producer consumer problem. BlockingQueue can completely act as a shared data area, which can well complete the cooperation between producer and consumer threads.
public class ProductorConsumer { private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(15); for (int i = 0; i < 5; i++) { service.submit(new Productor(queue)); } for (int i = 0; i < 10; i++) { service.submit(new Consumer(queue)); } } static class Productor implements Runnable { private BlockingQueue queue; public Productor(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { Random random = new Random(); int i = random.nextInt(); System.out.println("producer" + Thread.currentThread().getName() + "production data " + i); queue.put(i); } } catch (InterruptedException e) { e.printStackTrace(); } } } static class Consumer implements Runnable { private BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { Integer element = (Integer) queue.take(); System.out.println("consumer" + Thread.currentThread().getName() + "Consuming data" + element); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
Output results:
Consumer pool-1-thread-7 is consuming data 1520577501 Producer pool-1-thread-4 production data-127809610 Consumer pool-1-thread-8 is consuming data 504316513 Production data of producer pool-1-thread-2 1994678907 Consumer pool-1-thread-11 is consuming data 1967302829 Production data of producer pool-1-thread-1 369331507 Consumer pool-1-thread-9 is consuming data 1994678907 Producer pool-1-thread-2 production data-919544017 Consumer pool-1-thread-12 is consuming data-127809610 Production data of producer pool-1-thread-4 1475197572 Consumer pool-1-thread-14 is consuming data-893487914 Producer pool-1-thread-3 production data 906921688 Consumer pool-1-thread-6 is consuming data-1292015016 Producer pool-1-thread-5 production data-652105379 Producer pool-1-thread-5 production data-1622505717 Producer pool-1-thread-3 production data-1350268764 Consumer pool-1-thread-7 is consuming data 906921688 Production data of producer pool-1-thread-4 2091628867 Consumer pool-1-thread-13 is consuming data 1475197572 Consumer pool-1-thread-15 is consuming data-919544017 Production data of producer pool-1-thread-2 564860122 Production data of producer pool-1-thread-2 822954707 Consumer pool-1-thread-14 is consuming data 564860122 Consumer pool-1-thread-10 is consuming data 369331507 Producer pool-1-thread-1 production data-245820912 Consumer pool-1-thread-6 is consuming data 822954707 Production data of producer pool-1-thread-2 1724595968 Producer pool-1-thread-2 production data-1151855115 Consumer pool-1-thread-12 is consuming data 2091628867 Producer pool-1-thread-4 production data-1774364499 Production data of producer pool-1-thread-4 2006106757 Consumer pool-1-thread-14 consuming data-1774364499 Producer pool-1-thread-3 production data-1070853639 Consumer pool-1-thread-9 is consuming data-1350268764 Consumer pool-1-thread-11 is consuming data-1622505717 Production data of producer pool-1-thread-5 355412953
It can be seen that using BlockingQueue to implement producer consumer is very simple, which is the use of BlockingQueue to insert and get data additional blocking operation.
As for the three ways of producer consumer implementation, I will summarize them here, hoping to help you, and if there are any mistakes, please correct them.
I have compiled the interview questions and answers into PDF documents, as well as a set of learning materials, including Java virtual machine, spring framework, java thread, data structure, design pattern, etc., but not limited to this.
Focus on the official account [java circle] for information, as well as daily delivery of quality articles.