1。スレッドプールの基本的な使用
1.1。なぜスレッドプールが必要なのですか?
毎日のビジネスでは、マルチスレッドを使用したい場合は、ビジネスが始まる前にスレッドを作成し、ビジネスが終了した後にスレッドを破壊します。ただし、ビジネスの場合、スレッドの作成と破壊はビジネスそのものとは関係ありません。また、スレッドによって実行されるタスクのみを気にします。したがって、ビジネスに関連していないスレッドを作成および破壊するのではなく、タスクを実行するために、できるだけ多くのCPUを使用したいと考えています。スレッドプールはこの問題を解決します。スレッドプールの機能は、スレッドを再利用することです。
1.2。 JDKはどのようなサポートを提供していますか
JDKの関連するクラス図を上の図に示します。
言及すべきいくつかの特別なカテゴリ。
呼び出し可能なクラスは実行可能なクラスに似ていますが、違いは、呼び出し可能なものが返品値を持っていることです。
ThreadPoolexecutorは、スレッドプールの重要な実装です。
執行者は工場のクラスです。
1.3。スレッドプールの使用
1.3.1。スレッドプールの種類
public static executorservice newfixedthreadpool(int nthreads){return new SthreadPoolexecutor(nthreads、nthreads、0l、timeunit.milliseconds、new linkedblockingqueue <runnable>()); threadPoolexecutor(1、1、0L、TimeUnit.MilliseConds、new linkedblockingqueue <runnable>());} public static executorservice newcachedthreadpool(){return new swreetpoolexecutor(0、integer.max_value、60l、timeunit.seconds.seconds.seconds.seconds.seconds.メソッドの観点からは、FixedThreadPool、SingleThreadExecutor、およびCachedThreadPoolがThreadPoolexecutorの異なるインスタンスであることは明らかですが、パラメーターは異なります。
public StreadPoolexecutor(int CorePoolsize、int maximumpoolsize、long keepalivetime、timeUnit unit、blockingqueue <runnable> workqueue){this(corepoolsize、maximumpoolsize、keepalivetime、unit、workqueue、executors.defaulthreadfactory()、defaulthler);} ThreadPoolexecutorコンストラクターのパラメーターの意味を簡単に説明しましょう。
このようにして、上記のFixedThreadPoolを見ると、コアの数と最大スレッド数は同じであるため、作業中にスレッドは作成されて破壊されません。タスクの数が大きく、スレッドプールのスレッドが満たされない場合、タスクはLinkedBlockingQueueに保存され、LinkedBlockingQueueのサイズはinteger.max_valueです。これは、タスクを継続的に追加すると、メモリがますます消費されることを意味します。
cachedthreadpoolは異なります。そのコアスレッド番号は0、ストレージの最大数は整数です。max_value、そのブロッキングキューは特別なキューであり、そのサイズは0です。コアスレッドの数は0であるため、タスクをSynchronusqueueに追加する必要があります。このキューは、1つのスレッドがそこからデータを追加し、別のスレッドがデータを取得した場合にのみ成功します。このキューにデータを追加するだけで、障害が返されます。リターンが失敗すると、スレッドプールがスレッドの展開を開始するため、CachedThreadPoolのスレッドの数が固定されていません。スレッドが60年代に使用されない場合、スレッドは破壊されます。
1.4。スレッドプールの使用の小さな例
1.4.1。シンプルなスレッドプール
java.util.concurrent.executorservice; Import java.util.concurrent.executors; public static class mytaskを実装するpublic static class mytaskを実装します{@override public void run(){system.out.println(system.currenttlnis() try {thread.sleep(1000); } catch(Exception e){e.printstacktrace(); }}} public static void main(string [] args){mytask mytask = new mytask(); executorservice es = executors.newfixedthreadpool(5); for(int i = 0; i <10; i ++){es.submit(mytask); }}} NewFixedThreadPool(5)が使用されているが、10個のスレッドが開始されるため、5個が一度に実行されるため、スレッドの再利用が見られることは明らかです。 ThreadIDは繰り返されます。つまり、最初の5つのタスクと最後の5つのタスクは、同じスレッドのバッチによって実行されます。
ここで使用されるもの
es.submit(mytask);
提出する方法もあります。
es.execute(mytask);
違いは、送信が将来のオブジェクトを返すことであり、後で導入されます。
1.4.2.scheduledthreadpool
java.util.concurrent.executors; Import java.util.concurrent.scheduledexecutorservice; Import java.util.concurrent.timeunit; public static void main(string [] args){scheduledexecutorerervice ses = executors.newschedtors.newspood; //前のタスクが完了していない場合、ディスパッチは開始されません。 SES.SCHEDULEWITHFIXEDDELAY(new runnable(){@Override public void run(){try {shood.sleep(1000); system.out.println(system.currenttimemillis()/1000);} catch(例外E){//例外}秒、そしてサイクルで2秒ごとに1回実行}}出力:
1454832514
1454832517
1454832520
1454832523
1454832526
...
タスクの実行には1秒かかるため、タスクスケジューリングは前のタスクが完了するまで待つ必要があります。つまり、ここで2秒ごとに、前のタスクが完了してから2秒後に新しいタスクが開始されることを意味します。
2。スレッドプールを拡張および強化します
2.1。コールバックインターフェイス
スレッドプールには、拡張操作を提供するコールバックAPIがいくつかあります。
executorservice es = new SthreadPoolexecutor(5、5、0L、TimeUnit.seconds、new LinkedBlockingQueue <Runnable>()){@Override Protected void beforeexecute(thread t、runnable r){system.out.println( "実行するための準備"); } @Override Protected void afterexecute(runnable r、throwable t){system.out.println( "execution reftelement"); } @Override Protected void terminated(){system.out.println( "スレッドプール出口"); }};スレッドプールの実行の前後にログ管理またはその他の操作を実装するために、スレッドプールエクセクターのexedecute、afferexecute、および終了方法を実装できます。
2.2。拒否戦略
時には、タスクが非常に重いため、システムに負荷がかかりすぎます。上記のように、タスクの数が増加すると、すべてのタスクが固定Threadpoolのブロッキングキューに配置され、メモリの消費量が多すぎて最終的にメモリオーバーフローになります。そのような状況は避けるべきです。したがって、スレッドの数がスレッドの最大数を超えていることがわかった場合、いくつかのタスクを放棄する必要があります。廃棄するときは、タスクを直接捨てるのではなく、タスクを書き留める必要があります。
ThreadPoolexecutorには別のコンストラクターがあります。
Public SthreadPoolexecutor(int CorePoolsize、int maximumpoolsize、long keepalivetime、timeUnit unit、blockingqueue <runnable> workqueue、threadefactory threadfactory、rexededexecutionhandler Handler){if(corepoolsize <0 || maximumpoolsize <= 0 || maximumsizizezizeizizeizezizezizezizeize IllegalArgumentException(); if(workqueue == null || threadfactory == null || handler == null)新しいnullpointerexception(); this.corepoolsize = corepoolsize; this.maximumpoolsize = maximumpoolsize; this.workqueue = workqueue; this.keepalivetime = unit.tonanos(keepalivetime); this.threadfactory = threadFactory; this.handler = handler; }後でThreadFactoryを紹介します。
ハンドラーは、ポリシーの実装を拒否します。これにより、タスクを実行できない場合はどうすればよいかがわかります。
上記の4つの戦略があります。
AbortPolicy:タスクを受け入れられない場合、例外がスローされます。
CallerrunSpolicy:タスクを受け入れられない場合は、呼び出しスレッドを完了します。
DisdardOldESTPolicy:タスクを受け入れられない場合、最古のタスクはキューによって破棄および維持されます。
DiscardPolicy:タスクを受け入れられない場合、タスクは破棄されます。
executorservice es = new SthreadPoolexecutor(5、5、0L、TimeUnit.seconds、new linkedblockingqueue <runnable>()、new rejectedexecutionhandler(){@Override public void reljectedexecution(runnable r、runnable r、threadpoolexecutor executor)もちろん、拒否ポリシーを自分自身で定義するために、拒否ExexecutionHandlerインターフェイスを実装することもできます。
2.3。 ThreadFactoryをカスタマイズします
ThreadFactoryをStreadPoolexecutorのコンストラクターで指定できることを見てきました。
スレッドプールのスレッドはすべてスレッドファクトリーによって作成されており、スレッドファクトリをカスタマイズできます。
デフォルトのスレッドファクトリー:
静的クラスdefaultThreadFactoryを実装するthreadfactory {private static final atomicinteger poolnumber = new AtomicInteger(1);プライベートファイナルスレッドグループグループ。プライベート最終AtomicInteger ThreadNumber = new AtomicInteger(1);プライベートファイナルストリングnameprefix; DefaultThreadFactory(){SecurityManager S = System.GetSeCurityManager(); group =(s!= null)? s.getThreadGroup():thread.currentThread()。getThreadGroup(); nameprefix = "pool-" + poolnumber.getandincrement() + "-thread-"; } public thread newThread(runnable r){thread t = newスレッド(group、r、nameprefix + threadnumber.getandincrement()、0); if(t.isdaemon())t.setdaemon(false); if(t.getPriority()!= thread.norm_priority)t.setpriority(thread.norm_priority); tを返します。 }}3。forkjoin
3.1。考え
それは分裂と征服のアイデアです。
フォーク/結合は、MapReduceアルゴリズムに似ています。 2つの違いは次のとおりです。フォーク/結合は、タスクが非常に大きい場合など、必要な場合にのみ小さなタスクに分割されますが、MapReduceは常にセグメンテーションの最初のステップを実行し始めます。 MapReduceは分散システムに適しているのに対し、Fork/JoinはJVM内のスレッドレベルにより適しているようです。
4.2.インターフェイスの使用
再帰反応:返品値はありません
recursivetask:返品値があります
4.3。簡単な例
Import java.util.arraylist; Import java.util.concurrent.forkjoinpool; Import java.util.concurrent.forkjointask; Import java.util.concurrent.recursivetask;パブリッククラスのcounttaskはrecursivetask <プライベートロングスタート。プライベートロングエンド; public countTask(ロングスタート、ロングエンド){super(); this.start = start; this.end = end; } @Override Protected long compute(){long sum = 0; boolean cancompute =(end -start)<threshold; if(cancompute){for(long i = start; i <= end; i ++){sum = sum+i; }} else {// 100の小さなタスクに分割されていますlong step =(start + end)/100; arrayList <CountTask> subtasks = new ArrayList <CountTask>(); long pos = start; for(int i = 0; i <100; i ++){long lastone = pos+step; if(lastone> end){lastone = end; } countTask subtask = new countTask(pos、lastone); POS + =ステップ + 1; subtasks.add(subtask); subtask.fork(); //サブタスクをスレッドプールにプッシュ} for(countTask T:Subtasks){sum += t.join(); //すべてのサブタスクが終了するのを待っています}} return sum; } public static void main(string [] args){forkjoinpool forkjoinpool = new forkjoinpool(); countTask task = new countTask(0、200000L); forkjointask <long> result = forkjoinpool.submit(task); try {long res = result.get(); System.out.println( "sum =" + res); } catch(例外e){// todo:例外を処理するe.printstacktrace(); }}}上記の例では、要約するタスクについて説明しています。蓄積されたタスクを100のタスクに分割し、各タスクは数値の合計のみを実行し、最終結合後、各タスクで計算された合計が蓄積されます。
4.4。実装要素
4.4.1.workqueueおよびctl
各スレッドには作業キューがあります
静的な最終クラスのワークキュー
作業キューには、スレッドを管理する一連のフィールドがあります。
揮発性int eventCount; //エンコードされた不活性化カウント。 <0非アクティブの場合
int nextwait; //次のイベントウェイターのエンコードされたレコード
int nrows; //鋼の数
intヒント; //スチールインデックスヒント
short poolindex; //プール内のこのキューのインデックス
最終的なショートモード。 // 0:Lifo、> 0:fifo、<0:共有
揮発性のQlock; // 1:ロックされた、-1:終了します。その他0
揮発性INTベース; //ポーリングの次のスロットのインデックス
int top; //プッシュのための次のスロットのインデックス
forkjointask <?> [] array; //要素(最初は未割り当て)
最終的なForkjoinpoolプール。 //含まれるプール(nullになる可能性があります)
最終forkjoinworkerthreadの所有者。 //共有されている場合は、スレッドまたはnullを所有しています
揮発性スレッドパーカー; // ==公園への呼び出し中の所有者。それ以外の場合はnull
揮発性のforkjointask <?> currentJoin; // awaitjoinに参加するタスク
forkjointask <?> currentSteal; //実行中の現在の非局所タスク
ここで、ForkJoinの実装にはJDK7とJDK8の間に大きな違いがあることに注意する必要があります。ここで紹介しているのはJDK8からです。スレッドプールでは、すべてのスレッドが実行されているわけではない場合があり、一部のスレッドが吊り下げられ、それらの吊り下げスレッドはスタックに保存されます。リンクされたリストで内部的に表されます。
NextWaitは、次の待機スレッドを指します。
poolindexスレッドプールの添え字のインデックスインデックス。
EventCount Initialized、EventCountはpoolindexに関連しています。合計32ビット、最初のビットはそれがアクティブ化されているかどうかを示し、15ビットは吊り下げられた回数を示します
eventCount、残りはpoolindexを表します。 1つのフィールドを使用して、複数の意味を表します。
workqueue workqueueは、forkjointask <?> [] arrayで表されます。上部とベースはキューの両端を表し、データはこれら2つの間にあります。
forkjoinpoolでCTL(64ビットの長いタイプ)を維持します
揮発性長いCTL;
*フィールドCTLには長い梱包があります。
* AC:アクティブランニングワーカーの数を差し引いてターゲット並列性(16ビット)
* TC:総労働者の数から目標並列性(16ビット)を差し引いた
* ST:プールが終了した場合(1ビット)真実
* EC:トップ待機スレッドの待機数(15ビット)
* ID:トップオブトレイバースタックのトップのプールインデックス(16ビット)
ACは、アクティブスレッドカウントから並列性の程度(おそらくCPUの数)を差し引いて表します
TCとは、スレッドの総数を差し引くことを意味します
STは、スレッドプール自体がアクティブ化されているかどうかを示します
ECは、一番の待ち時間で中断されたスレッドの数を表します
IDはプールインデックスが上部のスレッドを待っていることを示します
ST+EC+IDがEventCountと呼ばれるものであることは明らかです。
では、なぜ5つの変数を持つ変数を合成する必要があるのですか?実際、容量は5つの変数でほぼ同じを占めています。
変数コードを使用する読みやすさは、はるかに悪化します。
では、なぜ変数を使用するのですか?実際、これは最も賢いことです。なぜなら、これらの5つの変数は全体であるからです。マルチスレッドでは、5つの変数が使用される場合、変数の1つを変更するときは、5つの変数の整合性を確保する方法です。次に、変数を使用すると、この問題が解決します。ロックで解決すると、性能は低下します。
変数を使用すると、データの一貫性と原子性が保証されます。
Forkjoin Squadron CTLの変更はすべて、CAS操作を使用して行われます。以前の一連の記事で述べたように、CASはロックフリーの操作であり、パフォーマンスが優れています。
CAS操作は1つの変数のみをターゲットにできるため、この設計は最適です。
4.4.2。仕事の盗難
次に、スレッドプール全体のワークフローを紹介します。
各スレッドはRunWorkerを呼び出します
final void runworker(workqueue w){w.growarray(); //(int r = w.hint; scan(w、r)== 0;){r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}} scan()関数は、タスクを実行するためにスキャンすることです。
Rは比較的乱数です。
プライベートファイナルイントスキャン(WorkQueue W、int R){workqueue [] ws; int m;長いc = ctl; //一貫性チェックの場合((ws = workqueues)!= null &&(m = ws.length -1)> = 0 && w!= null){for(int j = m + m + 1、ec = w.eventcount ;;){workqueue q; int b、e; forkjointask <?> [] a; forkjointask <?> t; if((q = ws [(r -j)&m])!= null &&(b = q.base)-q.top <0 &&(a = q.array)!= null){long i =(((a.length -1)&b)<< ashift) + abase; if((t =((forkjointask <?>)u.getObjectvolatile(a、i))!= null){if(ec <0)helpRelease(c、ws、w、q、b); else if(q.base == b && u.compareandswapobject(a、i、t、null)){u.putorderedint(q、qbase、b + 1); if((b + 1)-q.top <0)Signalwork(ws、q); w.runtask(t); } } 壊す; } else if(-j <0){if((ec |(e =(int)c))<0)// inactiveまたはexenating return awaitwork(w、c、ec); else if(ctl == c){// long nc =(long)ec | ((c -ac_unit)&(ac_mask | tc_mask)); w.nextwait = e; W.EventCount = ec | int_sign; if(!u.compareandswaplong(this、ctl、c、nc))w.eventcount = ec; //バックアウト} break; }}} return 0; }スキャン方法を見てみましょう。スキャンのパラメーターの1つはWorkQueueです。上記のように、各スレッドにはワークキューがあり、複数のスレッドのワークキューはワークキューに保存されます。 Rは乱数です。 Rを使用してWorkQueueを見つけ、WorkQueueで実行するタスクを実行します。
次に、Workqueueベースを介して、ベースオフセットを取得します。
b = q.base
..
long i =(((a.length -1)&b)<< ashift) + abase;
..
次に、オフセットを介して最後のタスクを取得し、このタスクを実行します
t =((forkjointask <?>)u.getobjectvolatile(a、i))
..
w.runtask(t);
..
この大まかな分析により、現在のスレッドがスキャンメソッドを呼び出すと、現在のWorkQueueのタスクは実行されないが、乱数rを介して他のワークキュータスクを取得することがわかりました。これは、Forkjoinpoolの主要なメカニズムの1つです。
現在のスレッドは、独自のタスクに焦点を当てるだけでなく、他のタスクに優先順位を付けます。これにより、飢えが起こらないようにします。これにより、いくつかのスレッドがスタックまたはその他の理由により時間内にタスクを完了できないことを防ぎます。また、スレッドには大量のタスクがありますが、他のスレッドには何もすることがありません。
次に、Runtaskメソッドを見てみましょう
final void runtask(forkjointask <? task.doexec(); forkjointask <?> [] a = array; int md = mode; ++ nSteals; currentSteal = null; if(md!= 0)pollandexecall(); else if(a!= null){int s、m = a.length -1; forkjointask <?> t; while((s = top -1) - base> = 0 &&(t =(forkjointask <?>)u.getandsetobject(a、((m&s)<< ashift) + abase、null)!= null){top = s; t.doexec(); }} if((thread = owner)!= null)//最終的にclause thread.aftertoplevelexec()で行う必要はありません。 }}興味深い名前があります:CurrentSteal、盗まれたタスクは確かに私が今説明したものです。
task.doexec();
このタスクは完了します。
他の人のタスクを完了すると、独自のタスクを完了します。
トップを取得して最初のタスクを取得します
while((s = top -1) - base> = 0 &&(t =(forkjointask <?>)u.getandsetobject(a、((m&s)<< ashift) + abase、null)!= null){top = s; t.doexec();}次に、グラフを使用して、今すぐスレッドプールのプロセスを要約します。
たとえば、2つのスレッドT1とT2があります。 T1はT2のベースを介してT2の最後のタスクを取得します(もちろん、実際には乱数Rを介したスレッドの最後のタスクです)。T1は、独自のトップを通じて最初のタスクを実行します。それどころか、T2は同じことをします。
他のスレッドにかかるタスクはベースから始まり、自分のために取るタスクは上から始まります。これにより、競合が減少します
他のタスクが見つからない場合
else if(--j <0){if((ec |(e =(int)c))<0)// inactiveまたはexenating return awaitwork(w、c、ec); else if(ctl == c){// long nc =(long)ec | ((c -ac_unit)&(ac_mask | tc_mask)); w.nextwait = e; W.EventCount = ec | int_sign; if(!u.compareandswaplong(this、ctl、c、nc))w.eventcount = ec; //バックアウト} break; }次に、最初に、CTLの値が一連の実行によって変更され、NCが取得され、新しい値がCASで割り当てられます。次に、待機状態(以前の一連の記事で言及されているUnsafe's Park Methodと呼ばれる)を入力するために、待機work()を呼び出します。
ここで説明する必要があるのは、CTL値を変更することです。ここで、最初に、CTLのAC -1、およびACはCTLの上位16ビットを占有するため、直接-1にすることはできませんが、代わりにCTL -1の上位16ビット(0x1000000000000000000000000)をCTLの最初の16ビット(0x100000000000000000000)にする効果を達成します。
前述のように、EventCountはpoolindexを保存し、poolindexとWorkqueueのNextwaitを介して、すべての待機スレッドを通過できます。