The producer consumer pattern is the most common pattern among multi-threading: the producer thread (one or more) generates bread and puts it into the basket (set or array), and at the same time, the consumer thread (one or more) takes out the bread from the basket (set or array) and consumes it. Although they have different tasks, the resources they process are the same, which reflects a method of inter-thread communication.
This article will first explain the situation of single producers and single consumers, and then explain the situation of multi-producer and multi-consumer model. These two modes will also be implemented using the wait()/nofity()/nofityAll() mechanism and the lock()/unlock() mechanism respectively.
Before starting the introduction of the pattern, explain the usage details of the wait(), notify() and notifyAll() methods as well as the improved usage of lock()/unlock(), await()/signal()/signalAll().
1. The principle of waiting and wake up mechanism
wait(), notify() and notifyAll() respectively represent threads that enter sleep, wake up the sleep thread, and wake up all sleeping threads. But, which thread is the object? In addition, all three methods described in the API documentation must be used under the premise of a valid monitor (which can be understood as holding a lock). What does these three methods have to do with locking?
Taking the synchronization code block synchronized(obj){} or synchronization functions as an example, wait(), notify(), and notifyAll() can be used in their code structure because they all hold locks.
For the following two synchronization code blocks, lock obj1 and lock obj2 are used respectively. Thread 1 and thread 2 execute the synchronization code corresponding to obj1, and thread 3 and thread 4 execute the synchronization code corresponding to obj2.
class MyLock implements Runnable { public int flag = 0; Object obj1 = new Object(); Object obj2 = new Object(); public void run(){ while(true){ if(flag%2=0){ synchronized(obj1){ //Threads t1 and t2 perform this synchronization task//try{obj1.wait();}catch(InterruptedException i){} //obj1.notify() //obj1.notifyAll() } } else { synchronized(obj2){ //Thread t3 and t4 perform this synchronization task//try{obj2.wait();}catch(InterruptedException i){} //obj2.notify() //obj2.notifyAll() } } } } } } } }}class Demo { public static void main(String[] args){ MyLock ml = new MyLock(); Thread t1 = new Thread(ml); Thread t2 = new Thread(ml); Thread t3 = new Thread(ml); Thread t4 = new Thread(ml); t1.start(); t2.start(); try{Thread.sleep(1)}catch(InterruptedException i){}; ml.flag++; t3.start(); t4.start(); }}When t1 starts to execute to wait(), it will enter a sleep state, but it is not a normal sleep, but sleeps in a thread pool identified by obj1 (actually the monitor corresponds to the thread pool, but the monitor and lock are bound together at this time). When t2 starts executing, it finds that lock obj1 is held by other threads and it will enter a sleep state. This time it is because the lock resource is waiting rather than the sleep entered by wait(). Because t2 has already determined that it is applying for the obj1 lock, it will also enter the obj1 thread pool sleep, rather than ordinary sleep. Similarly, t3 and t4, these two threads will enter the obj2 thread pool to sleep.
When a thread executes to notify(), this notify() will randomly wake up any thread in the thread pool corresponding to its lock. For example, obj1.notify() will wake up any sleeping thread in the obj1 thread pool (of course, if there is no sleeping thread, then nothing will be done). Similarly, notifyAll() wakes up all sleeping threads in the corresponding thread pool of the lock.
What you must figure out is the "corresponding lock", because the lock must be specified explicitly when calling wait(), notify() and notifyAll(). For example, obj1.wait(). If the lock belongs to it is omitted, it means this object, that is, the prefixes of these three methods can only be omitted in non-static synchronization functions.
In short, when synchronization is used, lock is used, and the thread has a home, and all its basis is determined by the belonging lock. For example, when thread synchronization, it determines whether the lock is idle to decide whether to execute the subsequent code, and also determines whether to go to a specific thread pool to sleep. When awakening, it will only wake up the thread in the thread pool corresponding to the lock.
In application of these methods, generally in a task, wait() and notify()/notifyAll() appear in pairs and execute one by one. In other words, during this round of atomic synchronous execution, either wait() is executed to sleep, or notify() is executed to wake up the sleep thread in the thread pool. To achieve selective execution, you can consider using marking as the basis for judgment. Refer to the following examples.
2.Lock and Condition
The three methods of the wait() series are very limited because both sleep and wake up actions are completely coupled with the lock. For example, the thread associated with the lock obj1 can only wake up the thread in the obj1 thread pool, but cannot wake up the thread associated with the lock obj2; for example, when synchronized synchronization was originally synchronized, the lock was implicitly automatically acquired when the synchronization started, and after the entire task was executed, it implicitly automatically released the lock, which means that the action of acquiring the lock and releasing the lock could not be controlled manually.
Starting from JDK 1.5, java provides the java.util.concurrent.locks package, which provides the Lock interface, Condition interface and ReadWriteLock interface. The first two interfaces decouple the lock and monitor methods (sleep, wake-up operations). The Lock interface only provides locks. Through the lock method newConditon(), one or more monitors associated with the lock can be generated. Each monitor has its own sleep and wake-up methods. In other words, Lock replaces the use of synchronized methods and synchronized code blocks, and Condition replaces the use of Object monitor methods.
As shown in the figure below:
When a thread executes condition1.await(), the thread will enter the thread pool corresponding to the condition1 monitor to sleep. When condition1.signal() is executed, any thread in the condition1 thread pool will be randomly awakened. When condition1.signalAll() is executed, all threads in the condition1 thread pool will be awakened. Similarly, the same is true for the condition2 monitor.
Even if there are multiple monitors, as long as they are associated with the same lock object, the other thread can be operated across the monitor. For example, a thread in condition1 can execute condition2.signal() to wake up a thread in the condition2 thread pool.
To use this way of association of locks and monitors, refer to the following steps:
import java.util.concurrent.locks.*;Lock l = new ReentrantLock();Condition con1 = l.newCondition();condition con2 = l.newCondition();l.lock();try{ //Code segment containing await(), signal() or signalAll()...} finally { l.unlock(); // Since the code segment may be abnormal, unlock() must be executed, try must be used and unlock() must be put into the finally segment}For specific usage, please see the example code for Lock and condition later.
3. Single producer single consumer model
A producer thread, a consumer thread. For each bread produced by the producer, put it on the plate, the consumer takes out the bread from the plate for consumption. The basis for producers to judge whether to continue production is that there is no bread on the plate, while the basis for consumers to judge whether to consume is that there is bread on the plate. Since in this mode, only one loaf of bread is always placed on the plate, the plate can be omitted and the producer and the consumer can hand over the bread step by step.
First, we need to describe these three categories: one is the resource operated by multiple threads (here is bread), the second is the producer, and the third is the consumer. In the following example, I encapsulate the methods of producing bread and consuming bread into the producer and consumer classes respectively, which is easier to understand if they are encapsulated in the bread class.
//Description resource: The name and number of bread, determined by the number of bread class Bread { public String name; public int count = 1; public boolean flag = false; //This mark provides judgment marks for wait() and notify()}//The bread resource processed by the producer and the consumer are the same. To ensure this, //The bread class can be designed according to the singleton pattern, or the same bread object can be passed to the producer and the consumer through the construction method. The latter method is used here. //Describe the producer class Producer implements Runnable { private Bread b; //Member of the producer: the resource it wants to process Producer(Bread b){ this.b = b; } //Providing a method for producing bread public void produce(String name){ b.name = name + b.count; b.count++; } public void run(){ while(true){ synchronized(Bread.class){ //Use Bread.class as the lock identifier so that the synchronized code blocks of producers and consumers can use the same lock if(b.flag){ //wait() must be inside the synchronous code block, not only because the lock must be held to sleep, but there will be confusion in the judgment of the lock resource try{Bread.class.wait();}catch(InterruptedException i){} } produce("Bread"); System.out.println(Thread.currentThread().getName()+"-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- //Notify() must also be synchronized, otherwise the lock has been released, and the wake-up action cannot be performed //ps: In a synchronization task, wait() and notify() should only be executed, otherwise the other party's thread will be confused} } }}//Describe the consumer class Consumer implements Runnable { private Bread b; //Member of the consumer: the resource it wants to process Consumer(Bread b){ this.b = b; } //Method of providing consumption bread public String consumption(){ return b.name; } public void run(){ while(true){ synchronized(Bread.class){ if(!b.flag){ try{Bread.class.wait();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- //2. Create producer and consumer objects, and pass the same bread object to producer and consumer Producer pro = new Producer(b); Consumer con = new Consumer(b); //3. Create thread object Thread pro_t = new Thread(pro); Thread con_t = new Thread(con); pro_t.start(); con_t.start(); }}The final execution result should be produced and consumed, and this is a continuous cycle. as follows:
Thread-0---Producer----Bread1Thread-1---Consumer-------Bread1Thread-0---Producer----Bread2Thread-1--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
4. Use Lock and Condition to realize single production and consumption model
The code is as follows:
import java.util.concurrent.locks.*;class Bread { public String name; public int count = 1; public boolean flag = false; //Provide the same lock object and the same Condition object for producers and consumers public static Lock lock = new ReentrantLock(); public static Condition condition = lock.newCondition();}class Producer implements Runnable { private Bread b; Producer(Bread b){ this.b = b; } public void produce(String name){ b.name = name + b.count; b.count++; } public void run(){ while(true){ //Use Bread.lock to lock the resource Bread.lock.lock(); try{ if(b.flag){ try{Bread.condition.await();}catch(InterruptedException i){} } produce("Bread"); System.out.println(Thread.currentThread().getName()+"----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Bread.condition.signal(); } finally { Bread.lock.unlock(); } } }}class Consumer implements Runnable { private Bread b; Consumer(Bread b){ this.b = b; } public String consumption(){ return b.name; } public void run(){ while(true){ //Use Bread.lock to lock the resource Bread.lock.lock(); try{ if(!b.flag){ try{Bread.condition.await();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- //2. Create producer and consumer objects, and pass the same bread object to producer and consumer Producer pro = new Producer(b); Consumer con = new Consumer(b); //3. Create thread object Thread pro_t = new Thread(pro); Thread con_t = new Thread(con); pro_t.start(); con_t.start(); }}5. Multi-production and consumption model (single bread)
Here we first explain the model of multiple producers and multiple consumers, but at most one bread at the same time. This model may not be ideal in reality, but in order to lead to the real multi-production and multiple consumption model later, I think it is necessary to explain this model here and analyze this model and how it evolved from the single-production and single consumption code.
As shown in the figure below:
From single production and single consumption to multiple production and multiple consumption, due to multi-thread safety issues and deadlock issues, there are two issues that need to be considered:
For one party, how can multi-threading achieve the same production or consumption capacity as single-threading? In other words, how to make multi-threading look single-threading. The biggest difference between multi-threading and single-threading is multi-threading safety issues. Therefore, as long as you ensure that tasks executed by multi-threading can be synchronized.
The first question considers the problem of multi-threading on one party, and the second question considers how the two parties can cooperate harmoniously to complete production and consumption. That is, how to ensure that one side of the producer and the consumer is sleeping while the other side is active. Just wake up the other party when one party has finished performing the synchronization task.
In fact, from single thread to multi-threading, there are two issues that need to be considered: out-of-synchronization and deadlock. (1) When both the producer and the consumer side have multi-threads, the multi-threads of the producer can be regarded as a thread as a whole, and the multi-threads of the consumer side also as a whole, which solves the thread safety problem. (2) Combining the whole production and the whole consumer is regarded as multi-threading to solve the deadlock problem. The way to solve the deadlock in Java is to wake up the other party or wake up everything.
The question is how to ensure synchronization between multiple threads of a certain party? The code of a single consumer is analyzed by multi-threaded execution as an example.
while(true){ synchronized(Bread.class){ if(!b.flag){ try{Bread.class.wait();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------Suppose that consumption thread 1 wakes up consumption thread 2 after consuming a loaf of bread, and continues to loop, judge if(!flag), it will wait, and the lock is released. Assuming that the CPU just selects consumer thread 2, consumer thread 2 will also enter wait. When the producer produces a loaf of bread, suppose that consumption thread 1 is awakened, it will continue to consume the newly produced bread from the wait statement, suppose that consumption thread 2 is awakened again. When consumption thread 2 is selected by the CPU, consumption thread 2 will also consume downward from the wait statement, and the bread that was just produced is consumed. The problem arises again. Continuously awakened consumption threads 1 and 2 consume the same bread, which means that the bread is repeatedly consumed. This is another multi-threaded out-sync issue.
After talking about it for a long time, it is actually very simple to analyze after enlarging the line of sight. As long as the two or more threads of one party wait for judgment b.flag, then the two or more threads may be continuously awakened and continue to be produced or consumed downward. This creates the problem of multi-threading out-synchronization.
The problem of insecurity lies in the fact that multiple threads on the same party continue to produce or consume downwards after continuous awakening. This is caused by the if statement. If the wait thread can turn back to determine whether b.flag is true after wakeup, it can make it decide whether to continue waiting or production or consumption downward.
You can replace the if statement with a while statement to meet the requirements. In this way, regardless of whether multiple threads on a certain party are continuously awakened, they will turn back to judge b.flag.
while(true){ synchronized(Bread.class){ while(!b.flag){ try{Bread.class.wait();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------The first multithreaded safety issue was solved, but deadlock problems occurred. This is easy to analyze. The producer is regarded as a whole and the consumer is also a whole. When the producer's threads are all waiting (the production party's threads are continuously awakened, all the threads of the party will wait), and the consumer is also waiting, and the deadlock will appear. In fact, if you look at it in amplified way, the producer and consumer are regarded as one thread respectively. These two threads form multiple threads. When one side waits and cannot wake the other side, the other side will definitely wait, so it will be deadlocked.
For the problem of deadlock between both parties, as long as you ensure that the other party can be awakened, rather than the one party's continuous awakening, it can be solved. Just use notifyAll() or signalAll(), or you can wake up the other thread through signal() to solve the problem. See the second code below.
According to the above analysis, if the code of single production and single consumption model is improved, it can be changed to a multi-production and multi-consumption single bread model.
//Code segment 1class Bread { public String name; public int count = 1; public boolean flag = false; }//Describe the producer class Producer implements Runnable { private Bread b; Producer(Bread b){ this.b = b; } public void produce(String name){ b.name = name + b.count; b.count++; } public void run(){ while(true){ synchronized(Bread.class){ while(b.flag){ try{Bread.class.wait();}catch(InterruptedException i){} } produce("Bread"); System.out.println(Thread.currentThread().getName()+"------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ String consumption(){ return b.name; } public void run(){ while(true){ synchronized(Bread.class){ while(!b.flag){ try{Bread.class.wait();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- }}public class ProduceConsume_5 { public static void main(String[] args) { //1. Create resource object Bread b = new Bread(); //2. Create producer and consumer object Producer pro = new Producer(b); Consumer con = new Consumer(b); //3. Create thread object Thread pro_t1 = new Thread(pro); //Production thread 1 Thread pro_t2 = new Thread(pro); //Production thread 2 Thread con_t1 = new Thread(con); //Consumer thread 1 Thread con_t2 = new Thread(con); //Consumer thread 2 pro_t1.start(); pro_t2.start(); con_t1.start(); con_t2.start(); }}The following is the code refactored using Lock and Conditon, using signal() to wake up the other thread.
//Code segment 2import java.util.concurrent.locks.*;class Bread { public String name; public int count = 1; public boolean flag = false; public static Lock lock = new ReentrantLock(); public static Condition pro_con = lock.newCondition(); public static Condition con_con = lock.newCondition();}//Describe the producer class Producer implements Runnable { private Bread b; Producer(Bread b){ this.b = b; } public void produce(String name){ b.name = name + b.count; b.count++; } public void run(){ while(true){ Bread.lock.lock(); try{ while(b.flag){ try{Bread.pro_con.await();}catch(InterruptedException i){} } produce("Bread"); System.out.println(Thread.currentThread().getName()+"---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Bread.con_con.signal(); //Wake up the consumer thread} finally { Bread.lock.unlock(); } } }}//Describe the consumer class Consumer implements Runnable { private Bread b; Consumer(Bread b){ this.b = b; } public String consumption(){ return b.name; } public void run(){ while(true){ Bread.lock.lock(); try{ while(!b.flag){ try{Bread.con_con.await();}catch(InterruptedException i){} } System.out.println(Thread.currentThread().getName()+"--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- //2. Create producer and consumer objects Producer pro = new Producer(b); Consumer con = new Consumer(b); //3. Create thread object Thread pro_t1 = new Thread(pro); Thread pro_t2 = new Thread(pro); Thread con_t1 = new Thread(con); Thread con_t2 = new Thread(con); pro_t1.start(); pro_t2.start(); con_t1.start(); con_t2.start(); }}Let’s summarize the issues of more production and more consumption:
(1). The solution to the multi-threading out-synchronization of a certain party is to use while(flag) to determine whether waiting;
(2). The solution to the deadlock problem of both parties is to wake up the other party. You can use notifyAll(), signalAll() or the signal() method of the other party's monitor.
6. More production and consumption models
There are multiple producer threads and multiple consumer threads. The producer puts the produced bread into a basket (set or array), and the consumer takes the bread out of the basket. The basis for producers to judge continued production is that the basket is full, and the basis for consumers to judge continued consumption is whether the basket is empty. In addition, when the consumer takes out the bread, the corresponding position becomes empty again, and the producer can turn back and continue production from the starting position of the basket, which can be achieved by resetting the pointer of the basket.
In this model, in addition to describing producers, consumers, and bread, it is also necessary to describe the basket container. Suppose that an array is used as a container, every time the producer produces one, the production pointer shifts backwards, and every time the consumer consumes one, the consumption pointer shifts backwards.
The code is as follows: You can refer to the example code given in the API-->Condition class
import java.util.concurrent.locks.*;class Basket { private Bread[] arr; //the size of basket Basket(int size){ arr = new Bread[size]; } //the pointer of in and out private int in_ptr,out_ptr; //how many breads left in basket private int left; private Lock lock = new ReentrantLock(); private Condition full = lock.newCondition(); private Condition empty = lock.newCondition(); //bread into basket public void in(){ lock.lock(); try{ while(left == arr.length){ try{full.await();} catch (InterruptedException i) {i.printStackTrace();} } arr[in_ptr] = new Bread("MianBao",Producer.num++); System.out.println("Put the bread: "+arr[in_ptr].getName()+"-------into basket["+in_ptr+"]"); left++; if(++in_ptr == arr.length){in_ptr = 0;} empty.signal(); } finally { lock.unlock(); } } //bread out from basket public Bread out(){ lock.lock(); try{ while(left == 0){ try{empty.await();} catch (InterruptedException i) {i.printStackTrace();} } Bread out_bread = arr[out_ptr]; System.out.println("Get the bread: "+out_bread.getName()+"---------- from basket["+out_ptr+"]"); left--; if(++out_ptr == arr.length){out_ptr = 0;} full.signal(); return out_bread; } finally { lock.unlock(); } }}class Bread { private String name; Bread(String name,int num){ this.name = name + num; } public String getName(){ return this.name; }}class Producer implements Runnable { private Basket basket; public static int num = 1; //the first number for Bread's name Producer(Basket b){ this.basket = b; } public void run(){ while(true) { basket.in(); try{Thread.sleep(10);}catch(InterruptedException i){} } }}class Consumer implements Runnable { private Basket basket; private Bread i_get; Consumer(Basket b){ this.basket = b; } public void run(){ while(true){ i_get = basket.out(); try{Thread.sleep(10);}catch(InterruptedException i){} } }}public class ProduceConsume_7 { public static void main(String[] args) { Basket b = new Basket(20); // the basket size = 20 Producer pro = new Producer(b); Consumer con = new Consumer(b); Thread pro_t1 = new Thread(pro); Thread pro_t2 = new Thread(pro); Thread con_t1 = new Thread(con); Thread con_t2 = new Thread(con); Thread con_t3 = new Thread(con); pro_t1.start(); pro_t2.start(); con_t1.start(); con_t2.start(); con_t3.start(); }}This involves consumers, producers, bread and baskets, where bread and baskets are resources operated by multiple threads. The producer thread produces bread and puts it into the basket, and the consumer thread takes out the bread from the basket. The ideal code is to encapsulate both production tasks and consumption tasks in the resource class. Because bread is an element of the basket container, it is not suitable for packaging into the bread class, and packaging into the basket makes it easier to operate the container.
Note that you must put all codes involving resource operations inside the lock, otherwise multi-threaded out-synchronization problem will occur. For example, the method producing bread is defined in the Producer class, and then it is used as a parameter to the method basket.in() put into the basket, i.e., basket.in(producer()), which is wrong behavior because producer() is passed to the in() method after it is executed outside the lock.
The above article is based on the Java producer and consumer model (detailed analysis) and is the entire content shared by the editor. I hope it can give you a reference and I hope you can support Wulin.com more.