[高い並行性Java II]マルチスレッドファンデーションでは、最初に基本的なスレッド同期操作について言及しました。今回に言及したいのは、同時パッケージの同期制御ツールです。
1。さまざまな同期制御ツールの使用
1.1 REENTRANTLOCK
ReentrantLockは、同期の強化バージョンのように感じます。同期の特徴は、使用が簡単であり、すべてが処理のためにJVMに残されていることですが、その機能は比較的弱いことです。 JDK1.5の前に、ReentrantLockのパフォーマンスは同期よりも優れていました。 JVMの最適化により、現在のJDKバージョンの2つのパフォーマンスは同等です。簡単な実装の場合は、reentrantlockを意図的に使用しないでください。
同期と比較して、ReentrantLockはより機能的に豊富であり、リエントラント、中断可能、限られた時間、および公正なロックの特性を備えています。
まず、例を使用して、ReentrantLockの最初の使用法を説明しましょう。
パッケージテスト; Import java.util.concurrent.locks.reentrantlock; public classテスト実装runnable {public static reentrantlock lock = new ReentrantLock(); public static int i = 0; @Override public void run(){for(int j = 0; j <10000000; j ++){lock.lock(); {i ++; }最後に{lock.unlock(); }}} public static void main(string [] args)throws arturtedexception {test test = new test();スレッドT1 =新しいスレッド(テスト);スレッドT2 =新しいスレッド(テスト); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); }}iで++操作を実行する2つのスレッドがあります。スレッドの安全性を確保するために、ReentrantLockが使用されます。使用状況から、同期されたものと比較して、ReentrantLockはもう少し複雑であることがわかります。ロック解除操作は最終的に実行する必要があるため、最終的にロック解除されていない場合、コードに例外があり、ロックがリリースされず、同期がJVMによってリリースされる可能性があります。
では、ReentrantLockの優れた特徴は何ですか?
1.1.1再突入
単一のスレッドは繰り返し入力できますが、繰り返し終了する必要があります
lock.lock(); lock.lock(); try {i ++; }最後に{lock.unlock(); lock.unlock();}ReentrantLockはReentrant Lockであるため、同じロックを繰り返し取得できます。これには、ロック関連の取得カウンターがあります。ロックを所有するスレッドが再びロックを取得すると、取得カウンターは1増加し、実際のリリース(Reentrant Lock)を取得するには、ロックを2回リリースする必要があります。これは、同期のセマンティクスを模倣します。スレッドが、スレッドが既に持っているモニターによって保護されている同期ブロックを入力すると、スレッドは続行できます。スレッドが2番目の(またはその後の)同期ブロックを終了すると、ロックは放出されません。ロックは、スレッドが入力するモニターによって保護された最初の同期ブロックを終了するときにのみ放出されます。
パブリッククラスの子供は、父親を実装します。 }} public synchronized void dosomething(){system.out.println( "1child.dosomething()"); doanotherthing(); //自分のクラスで他の同期メソッドを呼び出す} private同期void doanotherthing(){super.dosomething(); //親クラスsystem.out.println( "3child.doanotherthing()")の同期メソッドを呼び出します。 } @Override public void run(){child.dosomething(); }} class father {public synchronized void dosomething(){system.out.println( "2father.dosomething()"); }}スレッドが異なる同期されたメソッドに入り、以前に得られたロックを放出しないことがわかります。したがって、出力はまだ連続しています。したがって、同期はリエントラントロックでもあります
出力:
1child.dosomething()
2father.dosomething()
3child.doanotherthing()
1child.dosomething()
2father.dosomething()
3child.doanotherthing()
1child.dosomething()
2father.dosomething()
3child.doanotherthing()
...
1.1.2。中断可能
同期とは異なり、ReentrantLockは割り込みに応答します。割り込み関連の知識ビュー[ハイコンカレンシーJava 2]マルチスレッドの基本
通常のlock.lock()は割り込みに応答できません。lock.lockinterrumdibly()は、割り込みに応答できます。
デッドロックシーンをシミュレートしてから、割り込みを使用してデッドロックに対処します
パッケージテスト; java.lang.management.managementfactory; import java.lang.management.threadinfo; import java.lang.management.threadmxbean; import java.util.concurrent.locks.reentrantlock; public class test runnable {public reintrantlock lock1 = new reintrantlock; public static reentrantlock lock2 = new ReentrantLock(); int lock;パブリックテスト(int lock){this.lock = lock; } @Override public void run(){try {if(lock == 1){lock1.lockinterruptibly(); {thread.sleep(500)を試してください。 } catch(Exception e){// todo:handle例外} lock2.lockinterrumdibly(); } else {lock2.lockinterrumdibly(); {thread.sleep(500)を試してください。 } catch(Exception e){// todo:handle exception} lock1.lockinterrumdibly(); }} catch(Exception e){// todo:handle例外}最後に{if(lock1.isheldbycurrentthread()){lock1.unlock(); } if(lock2.isheldbycurrentthread()){lock2.unlock(); } system.out.println(thread.currentthread()。getId() + ":thread exit"); }} public static void main(string [] args)throws arturtedexception {test t1 = new test(1);テストT2 =新しいテスト(2);スレッドスレッド1 =新しいスレッド(T1);スレッドスレッド2 =新しいスレッド(T2); thread1.start(); thread2.start(); thread.sleep(1000); //DEADLOCKCHECKER.CHECK(); } static class deadlockchecker {private final static threadmxbean = managementfactory .getThreadMXBean(); final static runnable deadlockchecker = new runnable(){@override public void run(){// todo auto-eneratedメソッドスタブwhile(true){long [] deadlockedthreadids = mbean.finddeadLockedThreads(); if(deadlockedthreadids!= null){threadInfo [] swerchInfos = mbean.getThreadInfo(deadlockedThreadIds); for(thread t:thread.getAllStackTraces()。keyset()){for(int i = 0; i <shoodinfos.length; i ++){if(t.getid()== threadinfos [i] .getthreadid()){t.interput(); }}}}} try {thread.sleep(5000); } catch(Exception e){// TODO:例外を処理}}}}}; public static void check(){thread t = newスレッド(deadlockchecker); T.SetDaemon(True); t.start(); }}}上記のコードはデッドロックを引き起こす可能性があり、スレッド1はlock1を取得し、スレッド2はlock2を取得し、互いのロックを取得したい場合があります。
上記のコードを実行した後、jstackを使用して状況を表示します
デッドロックが実際に発見されました。
DeadLockChecker.Check();メソッドは、デッドロックを検出し、デッドロックスレッドを中断するために使用されます。中断後、スレッドは正常に終了します。
1.1.3。時間制限
タイムアウトがロックを取得できない場合、それは偽りを返し、デッドロックを形成するために永久に待機しません。
lock.trylock(long timeout、timeUnitユニット)を使用して、時間と単位のパラメーターを備えた時間に程度のロックを実装します。
時間を制限できることを説明するための例を挙げましょう。
パッケージテスト; Import java.util.concurrent.timeunit; Import java.util.concurrent.locks.reentrantlock; public classテスト実装{public static reentrantlock lock = new ReentrantLock(); @Override public void run(){try {if(lock.trylock(5、timeunit.seconds)){shood.sleep(6000); } else {system.out.println( "lock failed"); }} catch(Exception E){}最後に{if(lock.isheldbycurrentthread()){lock.unlock(); }}} public static void main(string [] args){test t = new test();スレッドT1 =新しいスレッド(t);スレッドT2 =新しいスレッド(t); t1.start(); t2.start(); }}2つのスレッドを使用して、ロックを競います。スレッドがロックを取得すると、6秒間睡眠をとり、各スレッドは5秒間ロックを取得しようとします。
したがって、ロックを取得できないスレッドが必要です。あなたがそれを取得できない場合、あなたは直接終了します。
出力:
ロックに失敗します
1.1.4。フェアロック
使い方:
Public ReentrantLock(ブールフェア)
public static reentrantlock fairlock = new ReentrantLock(true);
一般的にロックは不公平です。最初に来るスレッドが最初にロックを取得できることは必ずしも可能ではありませんが、後で来るスレッドは後でロックを取得します。不公平なロックは空腹を引き起こす可能性があります。
フェアロックとは、このロックがスレッドが最初に来て、最初にロックを取得することを保証できることを意味します。公正なロックは空腹を引き起こすことはありませんが、フェアロックのパフォーマンスは、フェア以外のロックのパフォーマンスよりもはるかに悪化します。
1.2条件
条件とReentrantLockの関係は、同期とobject.wait()/signal()に似ています
await()メソッドにより、現在のスレッドが待機し、現在のロックが解放されます。信号()が他のスレッドまたはSignalall()メソッドで使用されると、スレッドはロックを取り戻し、実行を続けます。または、スレッドが中断されたら、待機から飛び出すこともできます。これは、object.wait()メソッドに非常に似ています。
waituninterractibly()メソッドは基本的にawait()メソッドと同じですが、プロセス中に応答割り込みが待機しません。 Singal()メソッドは、待機するスレッドを目覚めさせるために使用されます。相対的なSingAlall()メソッドは、すべてのスレッドを待っています。これは、objct.notify()メソッドに非常に似ています。
ここでは詳細に紹介しません。説明する例を挙げましょう。
パッケージテスト; Import java.util.concurrent.locks.condition; Import java.util.concurrent.locks.reentrantlock; public classテスト実装{public static reintrantlock lock = new ReentrantLock(); public static条件条件= lock.newcondition(); @Override public void run(){try {lock.lock(); condition.await(); system.out.println( "スレッドが進行している"); } catch(Exception e){e.printstacktrace(); }最後に{lock.unlock(); }} public static void main(string [] args)throws arturtedexception {test t = new test();スレッドスレッド=新しいスレッド(t); thread.start(); Thread.Sleep(2000); lock.lock(); condition.signal(); lock.unlock(); }}上記の例は非常に簡単です。スレッドを待って、メインスレッドを起こさせます。 condition.await()/信号は、ロックを取得した後にのみ使用できます。
1.3.Semaphore
ロックの場合、相互に排他的です。それは、私がロックを取得する限り、誰もそれを再び手に入れることができないことを意味します。
セマフォの場合、複数のスレッドが同時にクリティカルセクションに入ることができます。共有ロックと見なすことができますが、共有制限は制限されています。制限が使用された後、制限を取得していない他のスレッドは、クリティカルエリアの外側の依然としてブロックされます。金額が1の場合、ロックするのと同等です
これが例です:
パッケージテスト; Import java.util.concurrent.executorservice; Import java.util.concurrent.executors; Import java.util.concurrent.semaphore; public class test implations urnable {final semaphore semaphore = new semaphore(5); @Override public void run(){try {semaphore.acquire(); Thread.Sleep(2000); system.out.println(thread.currentthread()。getId() + "done"); } catch(Exception e){e.printstacktrace(); }最後に{semaphore.release(); }} public static void main(string [] args)throws arturnedexception {executorservice executorservice = executors.newfixedthreadpool(20);最終テストt = new Test(); for(int i = 0; i <20; i ++){executorservice.submit(t); }}}20個のスレッドを備えたスレッドプールがあり、各スレッドはSemaphoreのライセンスに移動します。セマフォには5つのライセンスしかありません。実行後、5がバッチに出力され、バッチが出力されていることがわかります。
もちろん、1つのスレッドも一度に複数のライセンスに適用できます
public void acture(int permities)スローが中断されたexception
1.4 readwritelock
readwritelockは、関数を区別するロックです。読み取りと執筆は2つの異なる機能です。読み取り読み取りは相互に排他的ではなく、読み取りワイトは相互に排他的であり、書き込みワイトは相互に排他的です。
この設計により、同時性が向上し、データセキュリティが保証されます。
使い方:
private static reintrantreadwritelock readwritelock = new reintrantreadwritelock();
private static lock readlock = readwriteLock.readLock();
private static lock writelock = readwritelock.writeLock();
詳細な例については、プロデューサーと消費者の問題、読者と作家の問題のJavaの実装を表示できます。ここでは拡張しません。
1.5 CountDownLatch
カウントダウンタイマーの典型的なシナリオは、ロケット発売です。ロケットが発射される前に、すべてが絶対確実であることを確認するために、さまざまな機器や楽器の検査がしばしば実行されます。エンジンは、すべての検査が完了した後にのみ点火することができます。このシナリオは、CountDownLatchに非常に適しています。実行する前にすべてのチェックスレッドが完了するまでイグニッションスレッドを待つことができます
使い方:
静的な最終カウントダウンラッチend = new CountDownLatch(10);
end.countdown();
end.await();
概略図:
簡単な例:
パッケージテスト; java.util.concurrent.countdownlatch;インポートjava.util.concurrent.executorservice; Import java.util.concurrent.executors; public countdownlatch countdownlatch = new countdownlatch(10);静的最終テストt = new Test(); @Override public void run(){try {thread.sleep(2000); system.out.println( "complete"); CountDownLatch.CountDown(); } catch(Exception e){e.printstacktrace(); }} public static void main(string [] args)throws arturnedexception {executorservice executorservice = executors.newfixedthreadpool(10); for(int i = 0; i <10; i ++){executorservice.execute(t); } countDownLatch.Await(); system.out.println( "end"); executorservice.shutdown(); }}メインスレッドは、「終了」を出力する前に、10個のスレッドすべてが実行されるのを待つ必要があります。
1.6 Cyclicbarrier
CountDownLatchと同様に、実行する前にいくつかのスレッドが完了するのを待っています。 CountDownLatchの違いは、このカウンターが繰り返し使用できることです。たとえば、カウンターを10に設定したとします。次に、10個のスレッドの最初のバッチを収集した後、カウンターはゼロに戻り、次の10個のスレッドのバッチを収集します
使い方:
パブリックサイクリックバリア(intパーティー、実行可能な障壁)
バリエラションとは、カウンターが1回カウントされたときにシステムが実行するアクションです。
待つ()
概略図:
これが例です:
パッケージテスト; Import java.util.concurrent.cyclicbarrier; public class Test Implements runnable {private string soldier;プライベートファイナルサイクリックバリエサイクリック。パブリックテスト(String Soldier、Cyclicbarrier Cyclic){this.soldier = soldier; this.cyclic = cyclic; } @Override public void run(){try {//すべての兵士がcyclic.await(); dowork(); //すべての兵士が作業を完了するのを待ちます。 } catch(例外e){// todo auto-enerated catch block e.printstacktrace(); }} private void dowork(){// dodo auto-enerated method stib try {thread.sleep(3000); } catch(Exception e){// todo:handle例外} system.out.println(soldier + ":done"); } public static class Barrierrunは実行可能{boolean flag; int n; Public Barrierrun(Boolean Flag、int n){super(); this.flag = flag; this.n = n; } @Override public void run(){if(flag){system.out.println(n + "タスク完了"); } else {system.out.println(n + "set compley"); flag = true; }}} public static void main(string [] args){final int n = 10;スレッド[]スレッド= newスレッド[n];ブールフラグ= false; Cyclicbarrier Barrier = new Cyclicbarrier(n、new Barrierrun(flag、n)); System.out.println( "set"); for(int i = 0; i <n; i ++){system.out.println(i+"report");スレッド[i] = new Thread(new Test( "Soldier" + I、Barrier));スレッド[i] .start(); }}}印刷結果:
集める
0レポート
1つのレポート
2つのレポート
3つのレポート
4つのレポート
5つのレポート
6つのレポート
7つのレポート
8つのレポート
9つのレポート
10セットの完全な兵士5:完了
兵士7:終了
兵士8:終了
兵士3:終了
兵士4:終了
兵士1:終了
兵士6:終了
兵士2:終了
兵士0:終了
兵士9:終了
10のタスクが完了しました
1.7 Locksupport
スレッドブロッキングプリミティブを提供します
サスペンドに似ています
locksupport.park();
locksupport.unpark(T1);
サスペンドと比較して、糸の凍結を引き起こすのは容易ではありません。
Locksupportのアイデアは、セマフォに多少似ています。内部ライセンスがあります。駐車時にこのライセンスを取り除き、普及しているときにこのライセンスを申請します。したがって、公園の前に登場する場合、糸の凍結は発生しません。
次のコードは、[高並行性Java 2]マルチスレッドファンデーションのサスペンドサンプルコードです。サスペンドを使用するときにデッドロックが発生します。
パッケージテスト; java.util.concurrent.locks.locksupportのインポート;パブリッククラステスト{staticオブジェクトu = new object(); static testsuspendthread t1 = new testSusPendThread( "T1"); static testsuspendthread t2 = new testSusPendThread( "T2"); public static class testsuspendthread extends thread {public testsuspendthread(string name){setName(name); } @Override public void run(){synchronized(u){system.out.println( "in" + getName()); //thread.currentthread()。suspend(); locksupport.park(); }}} public static void main(string [] args)throws arturtedexception {t1.start(); thread.sleep(100); t2.start(); // t1.resume(); // t2.resume(); locksupport.unpark(T1); locksupport.unpark(T2); t1.join(); t2.join(); }}ただし、Locksupportを使用しても、デッドロックは発生しません。
加えて
park()は割り込みに応答できますが、例外をスローしません。割り込み応答の結果、Park()関数のreturnは、thread.interrupded()から割り込みフラグを取得できます。
JDKにはParkを使用する場所はたくさんあります。もちろん、Locksupportの実装もUnsafe.park()を使用して実装されています。
public static void park(){
unsafe.park(false、0l);
}
1.8 ReentrantLockの実装
ReentrantLockの実装を紹介しましょう。 ReentrantLockの実装は、主に3つの部分で構成されています。
ReentrantLockの親クラスには、同期状態を表す状態変数があります。
/***同期状態。 */民間揮発性INT状態;
CAS操作を通じてロックを取得するように状態を設定します。 1に設定すると、ロックホルダーが現在のスレッドに与えられます
final void lock(){if(compareandsetState(0、1))setExclusiveOwnErthRead(thread.currentThread());それ以外の場合は取得(1); }ロックが成功しない場合、アプリケーションが作成されます
Public Final void acchire(int arg){if(!tryacquire(arg)&& quackirequeued(addwaiter(node.exclusive)、arg))self interput(); }まず、別のスレッドがロックをリリースした可能性があるため、適用後にTryAcquireを試してください。
まだロックを申請していない場合は、ウェイターを追加してください。
プライベートノードaddwaiter(ノードモード){node node = new node(thread.currentthread()、mode); // ENQの高速パスを試してください。障害ノードpred = tailで完全なENQにバックアップします。 if(pred!= null){node.prev = pred; if(compareandsettail(pred、node)){pred.next = node;ノードを返す; }} enq(node);ノードを返す; }この期間中に、ロックを申請するための多くの試みがあり、まだ申請できない場合は、ハングアップします。
プライベートファイナルブールParkandCheckEnterrupt(){locksupport.park(this); return thread.interrupded(); }同様に、ロックがリリースされ、その後、Proarkがここで詳細に説明されていない場合。
2。同時コンテナと典型的なソースコード分析
2.1 CONCURRENTHASHMAP
Hashmapはスレッドセーフコンテナではないことを知っています。ハッシュマップスレッドセーフを作成する最も簡単な方法は、使用することです
collections.synchronizedMap、それはハッシュマップのラッパーです
public static Map m = collections.synchronizedMap(new Hashmap());
同様に、リストの場合、SETも同様の方法を提供します。
ただし、この方法は、同時量が比較的少ない場合にのみ適しています。
SynchronizedMapの実装を見てみましょう
プライベート最終マップ<K、V> M; //バッキングマップ最終オブジェクトMutex; //同期するオブジェクトSynchronizedMap(Map <K、V> M){if(m == null)throw new NullPointerException(); this.m = m; mutex = this; } synchronizedMap(map <k、v> m、object mutex){this.m = m; this.mutex = mutex; } public int size(){synchronized(mutex){return m.size();}} public boolean isempty(){synchronized(mutex){return m.isempty();}} public boolean (mutex){return m.containsvalue(value);}} public v get(object key){synchronized(mutex){return m.get(key);}} public v put(k key、v value){synchronized(mutex){return m.put(key、value);}} {remotinized(key key){mutronized(metex){fey quey) public void putall(map <?extends k、?extends v> map){synchronized(mutex){m.putall(map);}} public void clear(){synchronized(mutex){m.clear();}}}}ハッシュマップを内部にラップし、ハッシュマップのすべての操作を同期しました。
各メソッドは同じロック(Mutex)を取得するため、PutやRemotなどの操作は相互に排他的であり、同時性の量を大幅に削減することを意味します。
Concurrenthashmapがどのように実装されているかを見てみましょう
public v put(k key、v value){segment <k、v> s; if(value == null)新しいnullpointerexception(); int hash = hash(key); int j =(hash >>> segmentshift)&segmentmask; if((s =(segment <k、v>)unsafe.getObject // nonvolatile; recheck(segments、(j << sshift) + sbase))== null)// return s.put(key、hash、value、false); }Concurrenthashmap内にはセグメントセグメントがあり、大きなハッシュマップをいくつかのセグメント(小さなハッシュマップ)に分割し、各セグメントのデータをハッシュします。このようにして、異なるセグメント上の複数のスレッドのハッシュ操作はスレッドセーフである必要があるため、同じセグメントのスレッドを同期するだけで、ロックの分離が実現し、同時性が大幅に増加する必要があります。
Concurrenthashmap.sizeを使用する場合、各セグメントのデータ合計をカウントする必要があるため、より面倒になります。現時点では、各セグメントにロックを追加してからデータ統計を実行する必要があります。これは、ロックを分離した後のわずかな欠点ですが、サイズの方法は高周波で呼び出されるべきではありません。
実装に関しては、同期とlock.lockを使用するのではなく、可能な限りトライロックします。同時に、HashMapの実装にもいくつかの最適化を行いました。ここでは言及しません。
2.2ブロッキングキュー
BlockingQueueは、高性能コンテナではありません。しかし、それはデータを共有するための非常に良い容器です。これは、生産者と消費者の典型的な実装です。
概略図:
詳細については、プロデューサーと消費者の問題のJava実装と、読者と作家の問題を確認できます。