일반적으로 생산 작업의 속도는 소비 속도보다 큽니다. 세부 사항의 문제는 대기열 길이와 생산 속도와 소비 속도와 일치하는 방법입니다.
일반적인 생산자 소비자 모델은 다음과 같습니다.
일반 생산의 경우 소비보다 빠릅니다. 대기열이 가득 차면, 우리는 현재 어떤 작업이 무시되거나 실행되지 않기를 원합니다. 프로듀서는 작업을 제출하기 전에 잠시 기다릴 수 있습니다. 대기열이 가득 차 있지 않으면 계속해서 작업을 제출하십시오. 차단도 쉽게 만들어 져 있습니다.
또한, 대기열이 비어 있으면 소비자는 작업을 받기 전에 잠시 기다릴 수 있습니다. . 타임 아웃 매개 변수가있는 과부하 메소드를 호출하는 것이 좋습니다. 이런 식으로 생산자가 실제로 생산을 중단했을 때 소비자는 무한히 기다리지 않을 것입니다.
따라서 차단을 지원하는 효율적인 생산 및 소비 모델이 구현됩니다.
잠깐만 요 ExecutorService를 직접 사용하는 것이 더 편리하지 않습니까?
ThreadPooleExecutor의 기본 구조를 살펴 보겠습니다.
그러나 문제는 ThreadPooleExexecutor를 구성 할 때 블록 큐어를 수동으로 지정하더라도 실제로 큐가 가득 차면 실행 메소드가 차단되지 않기 때문입니다.
코드 사본은 다음과 같습니다.
public void execute (runnable command) {
if (command == null)
새로운 nullpointerexception ()을 던지십시오.
if (poolsize> = corepoolsize ||! addifunderCorePoolSize (명령)) {
if (runstate == running && workqueue.offer (command)) {
if (runstate! = running || poolsize == 0)
inristqueuedtask handled (명령);
}
else if (! addifundermaximumpoolsize (명령))
Reject (명령); //가 종료되거나 포화되었습니다
}
}
현재 결과를 달성하기 위해 무언가를해야합니다. 생산자가 작업을 제출하고 대기열이 가득 차면 생산자는이를 차단하고 작업이 소비 될 때까지 기다릴 수 있습니다.
핵심은 동시 환경에서 제작자가 큐 전체를 판단 할 수 없으며 ThreadPoolexecutor.getqueue (). size ()를 호출하여 큐가 가득 찼는 지 확인할 수 없다는 것입니다.
스레드 풀의 구현에서, 대기열이 가득 차면, 건설 중에 통과 된 거부 지출 핸들러는 작업의 처리를 거부하기 위해 호출됩니다. 기본 구현은 ABORTPOLICY로, 거부 excutionEcutionException을 직접 버립니다.
여기에서 몇 가지 거부 전략에 대한 자세한 내용은 여기에서 우리의 요구에 더 가깝습니다. 사용자가 수행 한 소비. 생산자가 차단되지는 않지만 제출 작업도 중단됩니다.
코드 사본은 다음과 같습니다.
공개 정적 클래스 Callerrunspolicy는 거부 executionHandler를 구현합니다.
/**
* <tt> callerrunspolicy </tt>를 만듭니다.
*/
public callerrunspolicy () {}
/**
* 집행자가 아닌 한 발신자 스레드에서 작업 R을 실행합니다.
* 종료되었으며,이 경우 작업이 폐기됩니다.
* @param r 실행 요청 된 실행 가능한 작업
* @param e이 작업을 수행하려는 집행자
*/
공개 무효 거부 excution (runnable r, threadpoolexecutor e) {
if (! e.isshutdown ()) {
run ();
}
}
}
그러나이 전략은 생산자가 적을 때 생산자가 작업을 소비하는 동안 모든 작업을 소비했을 수 있으며, 생산자가 공허한 상태에있을 수 있습니다 이 과정을 계속 생성하면 소비자 스레드에서 굶주림이 발생할 수 있습니다.
비슷한 아이디어, 가장 간단한 방법을 언급하면, 거부 excutionHandler를 직접 정의하고 큐가 가득 차면 생산자의 차단을 실현할 때 다음을 변경할 수 있습니다.
코드 사본은 다음과 같습니다.
새로운 거부 executionHandler () {
@보수
공개 무효 거부 excution (Runnable R, ThreadPooleExecutor Executor) {
if (! executor.isshutdown ()) {
노력하다 {
executor.getqueue (). put (r);
} catch (InterruptedException e) {
// 중단되지 않아야합니다
}
}
}
};
이런 식으로, 우리는 더 이상 대기열과 소비자의 논리에 관심을 가질 필요가 없습니다.
원래 디자인과 비교할 때이 방법은 코드의 양을 줄이고 동시 환경에서 많은 문제를 피할 수 있습니다. 물론, 제출시 입가 제한으로 세마포어를 사용하는 것과 같은 다른 수단을 사용할 수도 있지만 생산자를 차단하려면 복잡해 보일 것입니다.