1. 스레드 풀의 기본 사용
1.1. 스레드 풀이 필요한 이유는 무엇입니까?
매일 비즈니스에서 멀티 스레딩을 사용하려면 비즈니스가 시작되기 전에 스레드를 생성하고 비즈니스가 끝나면 스레드를 파괴합니다. 그러나 비즈니스의 경우 스레드의 생성 및 파괴는 비즈니스 자체와 관련이 없으며 스레드가 수행하는 작업에만 관심이 있습니다. 따라서 비즈니스와 관련이없는 스레드를 만들고 파괴하기보다는 가능한 많은 CPU를 사용하여 작업을 수행하기를 희망합니다. 스레드 풀이이 문제를 해결합니다. 스레드 풀의 기능은 스레드를 재사용하는 것입니다.
1.2. JDK가 우리에게 어떤 지원을 제공합니까?
JDK의 관련 클래스 다이어그램은 위 그림에 나와 있습니다.
언급 할 몇 가지 특별 범주.
Callable 클래스는 Runable 클래스와 유사하지만 차이는 Callable이 반환 값을 가지고 있다는 것입니다.
ThreadPooleExecutor는 스레드 풀의 중요한 구현입니다.
집행자는 공장 수업입니다.
1.3. 스레드 풀 사용
1.3.1. 스레드 풀의 유형
public static executorService newfixedthreadpool (int nthreads) {새로운 ThreadPoolexecutor (nthreads, nthreads, 0l, timeUnit.milliseconds, new LinkedBlockingqueue <Runnable> ()); ThreadPoolExecutor (1, 1, 0L, TimeUnit.milliseconds, New LinkedBlockingQueue <Runnable> ());메소드 관점에서, FixedThreadpool, SingleTheRdreadExecutor 및 CachedThreadpool은 ThreadPooleExecutor의 다른 인스턴스이지만 매개 변수는 다릅니다.
public ThreadPooleExecutor (int corepoolsize, int maximumpoolsize, 롱 repialivetime, timeUnit 단위, blockingqueue <runnable> workqueue) {this (corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, executor.defaultthreadfactory (), defaulthandler);}; ThreadPooleExecutor 생성자의 매개 변수의 의미를 간략하게 설명해 봅시다.
이런 식으로 위에서 언급 한 고정식 풀을 살펴보면 코어 수와 최대 스레드 수는 동일하므로 작업 중에 스레드가 생성되고 파괴되지 않습니다. 작업 수가 크고 스레드 풀의 스레드를 충족 할 수없는 경우 작업이 LinkedBlockingQueue에 저장되고 LinkedBlockingQueue의 크기는 정수입니다 .max_value. 즉, 작업을 지속적으로 추가하면 메모리가 점점 더 많이 소비됩니다.
Cachedthreadpool은 다릅니다. 핵심 스레드 번호는 0이고 최대 스토리지 수는 정수입니다 .max_value, 차단 큐는 특수 큐인 동기식 큐이고 크기는 0입니다. 코어 스레드 수는 0이므로 동기식에 작업을 추가해야합니다. 이 큐는 한 스레드가 데이터를 추가하고 다른 스레드가 데이터를 가져올 때만 성공할 수 있습니다. 이 대기열에만 데이터를 추가하면 실패가 반환됩니다. 리턴이 실패하면 스레드 풀이 스레드를 확장하기 시작하므로 CachedThreadpool의 스레드 수가 고정되지 않은 이유입니다. 스레드가 60 대에 사용되지 않으면 실이 파괴됩니다.
1.4. 스레드 풀 사용의 작은 예
1.4.1. 간단한 스레드 풀
import java.util.concurrent.executorservice; import java.util.concurrent.executors; public class stroodpooldemo {public static class mytask emplements runnable {@override public void run () {system.out.currentln () + "thread id :"). try {thread.sleep (1000); } catch (예외 e) {e.printstacktrace (); }}} public static void main (String [] args) {mytask mytask = new Mytask (); ExecutorService es = executors.newfixedthreadpool (5); for (int i = 0; i <10; i ++) {es.submit (mytask); }}} NewFixedThreadpool (5)이 사용되지만 10 개의 스레드가 시작되기 때문에 한 번에 5 개가 실행되며 스레드 재사용이 보이는 것이 분명합니다. Threadid가 반복됩니다. 즉, 처음 5 개의 작업과 마지막 5 개의 작업은 동일한 스레드 배치에 의해 실행됩니다.
여기에 사용되는 것
es.submit (mytask);
제출 방법도 있습니다.
Es.Execute (Mytask);
차이점은 제출이 향후 개체를 반환한다는 것입니다. 나중에 도입 될 것입니다.
1.4.2.scheduledthreadpool
import java.util.concurrent.executors; import java.util.concurrent.scheduledexecutorservice; import java.util.concurrent.timeUnit; public stridepooldemo {public static void main (String] {scheduledexecutorservice ses = 10); // 이전 작업이 완료되지 않은 경우 발송이 시작되지 않습니다. ses.schedulewithfixedDelay (new runnable () {@override public void run () {try {strule.sleep (1000); system.out.println (system.currenttimemillis ()/1000);} catch (예외 e) {// to}}}, 0, timeUnit.); 초, 2 초마다 한 번씩 사이클에서 한 번 실행}}산출:
1454832514
1454832517
1454832520
1454832523
1454832526
...
작업 실행은 1 초가 걸리므로 작업 예약은 이전 작업이 완료되기까지 기다려야합니다. 즉, 여기서 2 초마다 이전 작업이 완료된 후 2 초 후에 새로운 작업이 시작됩니다.
2. 스레드 풀을 확장하고 향상시킵니다
2.1. 콜백 인터페이스
스레드 풀에는 확장 된 작업을 제공하기 위해 콜백 API가 있습니다.
executorService es = new ThreadPoolexecutor (5, 5, 0L, TimeUnit.seconds, New LinkedBlockingQueue <Runnable> ()) {@override protected void preforexecute (Thread T, Runnable R) {System.out.println ( "실행 준비"); } @override protected void afterExecute (runnable r, Throwable t) {System.out.println ( "실행 완료"); } @override protected void Terminated () {System.out.println ( "스레드 풀 종료"); }};스레드 실행 전후에 로그 관리 또는 기타 작업을 구현하기 위해 Prevexecute, Peccilecute, After Execute 및 The Thread PoolExecutor 방법을 구현할 수 있습니다. 스레드 풀 종료.
2.2. 거부 전략
때로는 작업이 매우 무겁기 때문에 시스템에 너무 많은 부하가 발생합니다. 위에서 언급 한 바와 같이, 작업 횟수가 증가하면 모든 작업이 FixedThreadPool의 차단 대기열에 배치되어 메모리 소비가 너무 많아서 결국 메모리 오버플로가됩니다. 그러한 상황은 피해야합니다. 따라서 스레드 수가 최대 스레드 수를 초과한다는 것을 알게되면 일부 작업을 포기해야합니다. 폐기 할 때 작업을 직접 버리는 대신 작업을 기록해야합니다.
ThreadPooleExecutor에는 다른 생성자가 있습니다.
Public ThreadPooleExecutor (int corepoolsize, int maximumpoolsize, leg recoyalivetime, timeUnit itor, blockingqueue <runnable> workqueue, strandfactory strandfactory, 거부 거부 executionHandler handler) {if (corepoolize <0 || maxImumpoolsize <0 | 불법 행위 덱싱 (); if (workqueue == null || threadfactory == null || handler == null) 던지기 nullpointerexception (); this.corepoolsize = corepoolsize; this.maximumpoolsize = maximumpoolsize; this.workqueue = Workqueue; this.keepalivetime = init.tonanos (recopalivetime); this.threadFactory = ThreadFactory; this.handler = handler; } 나중에 ThreadFactory를 소개합니다.
핸들러는 정책의 구현을 거부하며, 이는 작업을 실행할 수없는 경우 어떻게 해야하는지 알려줍니다.
위에는 4 가지 전략이 있습니다.
Abortpolicy : 작업을 수락 할 수 없으면 예외가 발생합니다.
Callerrunspolicy : 작업을 수락 할 수없는 경우 호출 스레드를 완료하십시오.
DiscrardoldestPolicy : 작업을 수락 할 수없는 경우 가장 오래된 작업은 대기열에 의해 폐기되고 유지됩니다.
DiscardPolicy : 작업을 수락 할 수 없으면 작업이 폐기됩니다.
ExecutorService es = new ThreadPoolexecutor (5, 5, 0L, TimeUnit.seconds, New LinkedBlockingQueue <Runnable> (), New RejectedExecutionHandler () {@override public void RejectedExecution (runnable r, ThreadPoolexecutor) {r.ToStred (r.toTring); }); 물론, 우리는 거부 excutionHandler 인터페이스를 스스로 구현하여 거부 정책을 스스로 정의 할 수 있습니다.
2.3. 스레드 factory를 사용자 정의합니다
ThreadPooleExexecutor의 생성자에 ThreadFactory를 지정할 수 있음을 방금 보았습니다.
스레드 풀의 스레드는 모두 스레드 공장에서 생성되며 스레드 공장을 사용자 정의 할 수 있습니다.
기본 스레드 공장 :
정적 클래스 DefaultThreadFactory 구현 threadFactory {private static final atomicinteger poolnumber = new atomicinteger (1); 개인 최종 스레드 그룹 그룹; 개인 최종 Atomicinteger ThreadNumber = New Atomicinteger (1); 개인 최종 문자열 namepRefix; defaultthreadFactory () {securityManager s = system.getSecurityManager (); 그룹 = (s! = null)? s.getThreadgroup () : thread.currentthread (). getTheRreadGroup (); naMepRefix = "pool-" + poolNumber.getAndIncrement () + "-thread-"; } public 스레드 newthread (runnable r) {스레드 t = 새 스레드 (Group, r, namepRefix + ThreadNumber.getAndIncrement (), 0); if (t.isdaemon ()) t.setdaemon (false); if (t.getPriority ()! = Thread.norm_priority) T.SetPriority (Thread.norm_priority); 반환 t; }}3. 포크 조인
3.1. 생각
그것은 나누고 정복하는 아이디어입니다.
Fork/Join은 MapReduce 알고리즘과 유사합니다. 이 둘의 차이점은 다음과 같습니다. 포크/조인은 작업이 매우 큰 경우와 같이 필요한 경우에만 작은 작업으로 나뉩니다. MapReduce는 항상 세분화의 첫 단계를 수행하기 시작합니다. MapReduce는 분산 시스템에 적합한 반면 Fork/Join은 JVM 내 스레드 레벨에 더 적합한 것으로 보입니다.
4.2. 인터페이스 사용
재귀 반응 : 반환 값 없음
recursivetask : 반환 값이 있습니다
4.3. 간단한 예
import java.util.arraylist; import java.util.concurrent.forkjoinpool; import java.util.concurrent.forkjointask; import java.util.concurrent.recursiveTask; public class counttask 확장 recursiveTask <long> {private static final inrsthold = 10000; 개인 장기 시작; 개인 장수; public counttask (Long Start, Long End) {super (); this.start = 시작; this.end = 끝; } @override protected long compute () {long sum = 0; 부울 cancompute = (종료 - 시작) <임계 값; if (cancompute) {for (long i = start; i <= end; i ++) {sum = sum+i; }} else {// 100 개의 작은 작업으로 나뉘어 긴 단계 = (start + end)/100; ArrayList <counttask> 서브 타스크 = New Arraylist <counttask> (); 긴 pos = 시작; for (int i = 0; i <100; i ++) {long lastone = pos+step; if (lastone> end) {lastone = end; } Counttask 서브 타스크 = New Counttask (POS, Lastone); pos + = step + 1; 하위 작업. ADD (서브 타스크); 서브 타스크 .fork (); // 하위 작업을 (Counttask T : Subtasks) {sum += t.join (); // 모든 서브 타스크가 종료되기를 기다리는}}} return sum; } public static void main (String [] args) {Forkjoinpool ForkJoinPool = 새로운 ForkJoinPool (); Counttask Task = New Counttask (0, 200000L); Forkjointask <long> 결과 = ForkjoinPool.Submit (작업); try {long res = result.get (); System.out.println ( "sum =" + res); } catch (예외 e) {// todo : 핸들 예외 e.printstacktrace (); }}} 위의 예는 요약하는 작업을 설명합니다. 누적 된 작업을 100 개의 작업으로 나누고, 각 작업은 숫자의 합만 수행하며, 최종 조인 후에 각 작업에 의해 계산 된 합이 축적됩니다.
4.4. 구현 요소
4.4.1. Workqueue 및 CTL
각 스레드에는 작업 대기열이 있습니다
정적 최종 클래스 Workqueue
작업 대기열에는 스레드를 관리하는 일련의 필드가 있습니다.
휘발성 int eventCount; // 인코딩 된 비활성화 수; 비활성 인 경우 <0
int nextwait; // 다음 이벤트 웨이터의 인코딩 된 레코드
int Narrows; // 강철 수
int 힌트; // 스틸 인덱스 힌트
짧은 풀인 덱스; // 풀 에서이 대기열의 색인
최종 짧은 모드; // 0 : lifo,> 0 : fifo, <0 : 공유
휘발성 int qlock; // 1 : 잠금, -1 : 종료; else 0
휘발성 int베이스; // 여론 조사를위한 다음 슬롯의 색인
int 상단; // 푸시용 다음 슬롯의 색인
Forkjointask <?> [] 배열; // 요소 (처음에는 할당되지 않음)
최종 포크 풀 풀; // 함유 풀 (null 일 수 있음)
최종 ForkjoinWorkerThread 소유자; // 공유 된 경우 스레드 또는 NULL을 소유합니다
휘발성 스레드 파커; // == 공원 전화 중 소유자; 그렇지 않으면 널
휘발성지게 <?> currentJoin; // AWAITJOIN에서 작업 중입니다
Forkjointask <?> currentsteal; // 현재 비 국한 작업이 실행 중입니다
여기서 Forkjoin의 구현에서 JDK7과 JDK8 사이에 큰 차이가 있음에 주목해야합니다. 우리가 여기에서 소개하는 것은 JDK8입니다. 스레드 풀에서 때로는 모든 스레드가 실행되는 것은 아니며 일부 스레드가 매달리고 매달린 스레드는 스택에 저장됩니다. 내부적으로 링크 된 목록으로 표시됩니다.
Nextwait은 다음 대기 실을 가리 킵니다.
PoolIndex 스레드 풀에서 첨자의 인덱스 인덱스.
EventCount 초기화되면 EventCount는 PoolIndex와 관련이 있습니다. 총 32 비트, 첫 번째 비트는 그것이 활성화되었는지 여부를 나타내며 15 비트는 중단 된 횟수를 나타냅니다.
EventCount, 나머지는 PoolIndex를 나타냅니다. 하나의 필드를 사용하여 여러 의미를 나타냅니다.
Workqueue Workqueue는 Forkjointask <?> [] 배열로 표시됩니다. 상단과베이스는 큐의 양쪽 끝을 나타내고 데이터는이 두 가지 사이에 있습니다.
Forkjoinpool에서 CTL (64 비트 길이 유형)을 유지하십시오
휘발성 긴 CTL;
* Field CTL은 다음과 같이 포장되어 있습니다.
* AC : 액티브 달리기 작업자 수에서 대상 병렬 처리 (16 비트)
* TC : 총 작업자 수에서 대상 병렬 처리 (16 비트)
* ST : 수영장이 종료되면 (1 비트) True
* EC : 대기 대기 스레드의 대기 수 (15 비트)
* ID : Treiber Stack of Waiters (16 비트)의 Poolindex
AC는 활성 스레드 카운트를 뺀 것과 평행 정도를 나타냅니다 (아마도 CPU 수)
TC는 총 스레드 수를 뺀 것입니다
ST는 스레드 풀 자체가 활성화되었는지 여부를 나타냅니다
EC는 상단 대기 시간에 현탁 된 스레드 수를 나타냅니다.
ID는 상단의 스레드를 기다리는 PoolIndex를 나타냅니다.
ST+EC+ID는 우리가 방금 EventCount라고 불리는 것이 분명합니다.
그렇다면 왜 5 개의 변수로 변수를 종합해야합니까? 실제로 용량은 5 개의 변수와 거의 동일합니다.
가변 코드를 사용하는 가독성은 훨씬 더 나빠질 것입니다.
그렇다면 왜 변수를 사용합니까? 실제로, 이것은 가장 영리한 것입니다.이 5 가지 변수는 전체이기 때문입니다. 멀티 스레딩에서 5 개의 변수를 사용하는 경우 변수 중 하나를 수정할 때 5 개의 변수의 무결성을 보장하는 방법. 그런 다음 변수를 사용하면이 문제가 해결됩니다. 자물쇠로 해결되면 성능이 저하됩니다.
변수를 사용하면 데이터의 일관성과 원자력이 보장됩니다.
Forkjoin Squadron CTL의 변경 사항은 모두 CAS 작업을 사용하여 수행됩니다. 이전 기사 시리즈에서 언급했듯이 CAS는 잠금 작동이며 성능이 우수합니다.
CAS 작업은 하나의 변수 만 타겟팅 할 수 있으므로이 설계는 최적입니다.
4.4.2. 작업 도난
다음으로 전체 스레드 풀의 워크 플로를 소개합니다.
각 스레드는 런 워크를 호출합니다
최종 무효 런 워크 (Workqueue W) {w.growarray (); // (int r = w.hint; scan (w, r) == 0;) {r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}} 스캔 () 함수는 수행 할 작업을 스캔하는 것입니다.
R은 비교적 임의의 숫자입니다.
개인 최종 int 스캔 (Workqueue W, Int R) {Workqueue [] ws; int m; 긴 C = CTL; // 일관성을 확인하십시오. int b, e; Forkjointask <?> [] a; Forkjointask <?> t; if ((q = ws [(r -j) & m])! = null && (b = q.base) - q.top <0 && (a = q.array)! = null) {long i = ((A.length -1) & b) << awift) + abase; if ((t = ((ForkjointAsk <?>) u.getObjectVolatile (a, i))! = null) {if (ec <0) HelpRelease (if (ec <0) HelpRelease (if (ec <0); else if (q.base == b && u.compareAndswapoBject (a, i, t, null)) {u.putorderedint (q, qbase, b + 1); if ((b + 1) -q.top <0) 신호 작업 (ws, q); W.Runtask (T); } } 부서지다; } else if (-j <0) {if ((ec | (e = (e = (int) c)) <0) // 비활성 또는 종료 리턴 awaitwork (w, c, ec); else if (ctl == c) {// 긴 nc = (long) ec | ((c -ac_unit) & (ac_mask | tc_mask)); W.NextWait = e; w.eventCount = ec | int_sign; if (! u.compareAndswaplong (this, ctl, c, nc)) w.eventcount = ec; // back} break; }}} 반환 0; } 스캔 방법을 살펴 보겠습니다. 스캔의 하나의 매개 변수는 Workqueue입니다. 위에서 언급 한 바와 같이, 각 스레드에는 작업 큐가 있고 여러 스레드의 작업 큐는 작업 큐에 저장됩니다. R은 임의의 숫자입니다. r을 사용하여 Workqueue를 찾아 Workqueue에서 수행 할 작업이 있습니다.
그런 다음 Workqueue Base를 통해 기본 오프셋을 가져옵니다.
B = Q.베이스
..
긴 i = (((A.length -1) & b) << ashift) + abase;
..
그런 다음 오프셋을 통해 마지막 작업을 수행 하고이 작업을 실행하십시오.
t = ((Forkjointask <?>) U.GetObjectVolatile (a, i))
..
W.Runtask (T);
..
이 대략적인 분석을 통해 현재 스레드가 스캔 메소드를 호출 한 후 현재 Workqueue에서 작업을 실행하지는 않지만 임의의 숫자 r을 통해 다른 Workqueue 작업을 얻을 수 있음을 발견했습니다. 이것은 Forkjoinpool의 주요 메커니즘 중 하나입니다.
현재 스레드는 자체 작업에 중점을 둘뿐만 아니라 다른 작업의 우선 순위를 정합니다. 이것은 굶주림이 일어나지 못하게합니다. 이렇게하면 일부 스레드가 고착되거나 다른 이유로 인해 작업을 완료 할 수 없거나 스레드에 많은 양의 작업이 있지만 다른 스레드에는 할 일이 없습니다.
그런 다음 Runtask 방법을 살펴 보겠습니다
Final void Runtask (ForkjointAsk <?> task) {if ((currentSteal = task)! = null) {ForkJoinWorkerTheRdread 스레드; task.doexec (); Forkjointask <?> [] a = 배열; int md = 모드; ++ nsteals; currentsteal = null; if (md! = 0) pollandexecall (); else if (a! = null) {int s, m = a.length -1; Forkjointask <?> t; while ((s = top -1) - base> = 0 && (t = (Forkjointask <?>) u.getAndsetObject (a, ((m & s) << ashift)) + abase, null)! = null) {top = s; t.doexec (); }} if ((스레드 = 소유자)! = null) // 마지막 절단에서 할 필요가 없습니다. }}흥미로운 이름이 있습니다 : Currentsteal, 도난당한 과제는 실제로 내가 방금 설명한 것입니다.
task.doexec ();
이 작업은 완료됩니다.
다른 사람들의 작업을 완료하면 자신의 작업을 완료하게됩니다.
정상을 얻음으로써 첫 번째 작업을 얻으십시오
while ((s = top -1) - base> = 0 && (t = (Forkjointask <?>) u.getAndsetObject (a, ((m & s) << ashift)) + abase, null)! = null) {top = s; t.doexec ();}다음으로 그래프를 사용하여 지금은 스레드 풀의 프로세스를 요약하십시오.
예를 들어, 두 개의 스레드 T1과 T2가 있습니다. T1은 T2의베이스를 통해 T2의 마지막 작업을 얻게됩니다 (물론, 실제로는 임의의 숫자 R을 통한 스레드의 마지막 작업입니다). T1은 자체 상단을 통해 첫 번째 작업을 수행합니다. 반대로, T2도 똑같이 할 것입니다.
다른 스레드에 대한 작업은 기본에서 시작되며 자신을 위해 취하는 작업은 맨 위에서 시작됩니다. 이것은 갈등을 줄입니다
다른 작업이없는 경우
else if (-j <0) {if ((ec | (e = (int) c)) <0) // 비활성 또는 종료 리턴 awaitwork (w, c, ec); else if (ctl == c) {// 긴 nc = (long) ec | ((c -ac_unit) & (ac_mask | tc_mask)); W.NextWait = e; w.eventCount = ec | int_sign; if (! u.compareAndswaplong (this, ctl, c, nc)) w.eventcount = ec; // back} break; } 먼저, CTL의 값은 일련의 실행을 통해 변경되며, NC가 얻어지고 새로운 값이 CAS와 함께 할당됩니다. 그런 다음 Awaitwork ()를 호출하여 대기 상태에 입력하십시오 (이전 기사에서 언급 한 안전하지 않은 공원 방법이라고 함).
여기서 설명해야 할 것은 CTL 값을 변경하는 것입니다. 여기서, CTL의 AC -1 및 AC는 CTL의 상위 16 비트를 차지하므로 직접 -1이 될 수는 없지만 대신 처음 16 비트의 CTL에서 AC_UNIT (0x100000000000000)의 상위 16 비트를 만드는 효과를 달성합니다.
앞에서 언급했듯이 EventCount는 PoolIndex를 저장하고 Workqueue의 Poolindex 및 Nextwait을 통해 모든 대기 스레드를 가로 질러 갈 수 있습니다.