Generally speaking, the speed of production tasks is greater than the speed of consumption. A question of detail is the queue length, and how to match the speed of production and consumption.
A typical producer-consumer model is as follows:
For general production is faster than consumption. When the queue is full, we do not want any task to be ignored or not executed. At this time, the producer can wait for a moment before submitting the task. A better approach is to block the producer on the method of submitting the task. Continue to submit tasks when the queue is not full, so there is no wasted idle time. Blocking is also easy. BlockingQueue is created for this. ArrayBlockingQueue and LinkedBlockingQueue can provide capacity limits when constructing. LinkedBlockingQueue determines capacity after each lock is obtained when actually operating the queue.
Further, when the queue is empty, consumers cannot get the task, they can wait for a while before getting it. A better approach is to use the BlockingQueue take method to block and wait. When there is a task, they can be executed immediately. It is recommended to call take. The overloaded method with timeout parameters, the thread exits after the timeout. In this way, when the producer has actually stopped production, consumers will not wait infinitely.
Therefore, an efficient production and consumption model supporting blocking is implemented.
Wait, since JUC has helped us implement thread pools, why do we still need to use this set of things? Isn't it more convenient to use ExecutorService directly?
Let's take a look at the basic structure of ThreadPoolExecutor:
But the problem is that even if you manually specify a BlockingQueue as a queue implementation when constructing ThreadPoolExecutor, in fact, when the queue is full, the execute method will not block. The reason is that ThreadPoolExecutor calls the BlockingQueue non-blocking offer method:
The code copy is as follows:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
At this time, something needs to be done to achieve a result: when the producer submits a task and the queue is full, the producer can block it and wait for the task to be consumed.
The key is that in a concurrent environment, the queue full cannot be judged by the producer, and ThreadPoolExecutor.getQueue().size() cannot be called to determine whether the queue is full.
In the implementation of thread pool, when the queue is full, the RejectedExecutionHandler passed in during the construction will be called to reject the processing of the task. The default implementation is AbortPolicy, which directly throws a RejectedExecutionException.
I won't go into details about several rejection strategies here. The CallerRunsPolicy is closer to our needs here. This strategy will allow the thread submitting the task to execute the task when the queue is full, which is equivalent to letting the producer temporarily do the consumption. The work done by the user, although the producer is not blocked, the submission task will also be suspended.
The code copy is as follows:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>CallerRunsPolicy</tt>.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
However, this strategy also has hidden dangers. When there are fewer producers, the consumer may have consumed all the tasks during the time the producer consumes the tasks, and the queue is in an empty state. Only when the producer completes the task can he continue to produce the tasks. This process may lead to hunger in consumer threads.
Referring to similar ideas, the simplest way, we can directly define a RejectedExecutionHandler, and change it to call BlockingQueue.put when the queue is full to realize the blocking of the producer:
The code copy is as follows:
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
// should not be interrupted
}
}
}
};
In this way, we no longer need to care about the logic of Queue and Consumer. We just focus on the implementation logic of producer and consumer threads and just submit tasks to the thread pool.
Compared with the original design, this method can reduce the amount of code, and can avoid many problems in concurrent environments. Of course, you can also use other means, such as using semaphores as entry limits when submitting, but if you just want to block the producer, it will seem complicated.