Dans la fondation multi-lancement [à haute concurrence Java II], nous avons initialement mentionné les opérations de synchronisation de base de threads de base. Ce que nous voulons mentionner cette fois, c'est l'outil de contrôle de synchronisation dans le package simultané.
1. Utilisation de divers outils de contrôle de synchronisation
1.1 Reentrantlock
Reentrantlock ressemble à une version améliorée de synchronisée. La fonctionnalité de synchronisée est qu'elle est simple à utiliser et que tout reste au JVM pour le traitement, mais ses fonctions sont relativement faibles. Avant JDK1.5, les performances de Reentrantlock étaient meilleures que synchronisées. En raison de l'optimisation du JVM, les performances des deux dans la version JDK actuelle sont comparables. S'il s'agit d'une simple implémentation, n'utilisez pas délibérément ReentrantLock.
Par rapport à la synchronisée, le reentrantlock est plus fonctionnellement riche et il a les caractéristiques de la réentrante, d'interruption, de temps limité et de verrouillage équitable.
Tout d'abord, utilisons un exemple pour illustrer l'utilisation initiale de Reentrantlock:
Test de package; import java.util.concurrent.locks.reentrantlock; le test de classe publique implémente Runnable {public static reentrantlock lock = new reentrantLock (); public statique int i = 0; @Override public void run () {for (int j = 0; j <10000000; j ++) {lock.lock (); essayez {i ++; } enfin {lock.unlock (); }}} public static void main (String [] args) lève InterruptedException {test test = new test (); Thread t1 = nouveau thread (test); Thread t2 = nouveau thread (test); t1.start (); t2.start (); t1.join (); t2.join (); System.out.println (i); }}Il y a deux threads qui effectuent des opérations ++ sur i. Afin d'assurer la sécurité des filetages, Reentrantlock est utilisé. D'après l'utilisation, nous pouvons voir que par rapport à la synchronisée, le reentrantlock est un peu plus compliqué. Étant donné que l'opération de déverrouillage doit être effectuée enfin, si elle n'est pas enfin déverrouillée, il est possible que le code ait une exception et que le verrou n'est pas libéré, et Synchronisé est publié par le JVM.
Alors, quelles sont les excellentes caractéristiques de Reentrantlock?
1.1.1 Réentrée
Un seul fil peut être entré à plusieurs reprises, mais doit être sorti à plusieurs reprises
lock.lock (); lock.lock (); essayez {i ++; } enfin {lock.unlock (); lock.unlock ();}Étant donné que Reentrantlock est un verrou réentrant, vous pouvez obtenir le même verrou à plusieurs reprises, qui a un compteur d'acquisition lié à la verrou. Si un fil qui possède le verrou obtient à nouveau le verrou, le compteur d'acquisition est augmenté de 1 et le verrou doit être libéré deux fois pour obtenir la libération réelle (verrouillage réentrant). Cela imite la sémantique de synchronisée; Si le thread entre un bloc synchronisé protégé par le moniteur dont dispose déjà le thread, le thread est autorisé à continuer. Lorsque le thread quitte le deuxième bloc synchronisé (ou suivant), le verrou n'est pas libéré. Le verrouillage n'est libéré que lorsque le thread quitte le premier bloc synchronisé protégé par le moniteur qu'il entre.
Classe publique Child étend que le père implémente Runnable {final static child enfant = new Child (); // afin de s'assurer que les verrouillent un vide statique public unique (String [] args) {pour (int i = 0; i <50; i ++) {new Thread (enfant) .start (); }} public synchronisé void dosomething () {System.out.println ("1Child.dosomething ()"); Doanothething (); // Appelez d'autres méthodes synchronisées dans votre propre classe} private synchronisé void doanothething () {super.dosomething (); // Appelez la méthode synchronisée de la classe parent System.out.println ("3Child.Doanotherthing ()"); } @Override public void run () {child.dosomething (); }} classe Père {public synchronisé void dosomething () {System.out.println ("2father.dosomething ()"); }}Nous pouvons voir qu'un fil entre une méthode synchronisée différente et ne libérera pas les verrous obtenus auparavant. La sortie est donc toujours séquentiellement. Par conséquent, synchronisé est également un verrou réentrant
Sortir:
1child.dosomething ()
2father.dosomething ()
3Child.Doanotherthing ()
1child.dosomething ()
2father.dosomething ()
3Child.Doanotherthing ()
1child.dosomething ()
2father.dosomething ()
3Child.Doanotherthing ()
...
1.1.2. Interruptible
Contrairement à la synchronisation, le reentrantlock est sensible aux interruptions. Interrompre la vue de connaissances connexes [Haute concurrence Java 2] Bases de la lecture multiple
Ordinary Lock.lock () ne peut pas répondre aux interruptions, Lock.LockInterruply () peut répondre aux interruptions.
Nous simulons une scène de blocage, puis utilisons les interruptions pour faire face à l'impasse
Test de package; import java.lang.management.managementfactory; import java.lang.management.threadinfo; import java.lang.management.threadmxbean; import java.util.concurrent.locks.reentrantlock; test de classe publique implémente Runnable {public static reentrantlock Lock1 = new ReentrantLock ();); Public static reentrantLock Lock2 = new reentrantLock (); INT LOCK; test public (int lock) {this.lock = lock; } @Override public void run () {try {if (lock == 1) {lock1.lockinterruptible (); essayez {thread.sleep (500); } catch (exception e) {// todo: greg exception} lock2.lockInterruply (); } else {lock2.lockInterruply (); essayez {thread.sleep (500); } catch (exception e) {// todo: gère exception} lock1.lockinterruptiblement (); }} catch (exception e) {// todo: gère exception} enfin {if (lock1.isheldbycurrentThread ()) {lock1.unlock (); } if (lock2.isheldByCurrentThread ()) {lock2.unlock (); } System.out.println (Thread.currentThread (). GetId () + ": Thread Exit"); }} public static void main (String [] args) lève InterruptedException {test t1 = new test (1); Test t2 = nouveau test (2); Thread thread1 = nouveau thread (t1); Thread thread2 = nouveau thread (t2); thread1.start (); thread2.start (); Thread.Sleep (1000); //Deadlockchecker.check (); } classe statique daadlockChecker {private final static threadmxbean mbean = ManagementFactory .getThreadMXBean (); Final static runnable dildlockChecker = new Runnable () {@Override public void run () {// TODO Stub de méthode généré automatiquement while (true) {long [] daadlockEdThaRedid = mbean.findDeadLockEdThreads (); if (dalplockEdThaRedid! = null) {ThreadInfo [] ThreadInfos = mBean.getThreadInfo (DeadlockEdThaRedid); for (Thread t: Thread.getAllStackTraces (). KeySet ()) {for (int i = 0; i <ThreadInfos.Length; i ++) {if (t.getId () == ThreadInfos [i] .getThreaDID ()) {t.interrupt (); }}}}} try {Thread.Sleep (5000); } catch (exception e) {// todo: gère exception}}}}}; public static void check () {thread t = new thread (dildlockchecker); t.setDaemon (true); t.start (); }}}Le code ci-dessus peut provoquer des blocages, Thread 1 obtient Lock1, Thread 2 obtient Lock2, puis les uns les autres veulent se procurer les serrures.
Nous utilisons JStack pour afficher la situation après avoir exécuté le code ci-dessus
Une impasse a en effet été découverte.
The Deadlockchecker.check (); La méthode est utilisée pour détecter les blocs de bloces puis interrompre le thread de blocage. Après l'interruption, le thread sort normalement.
1.1.3. Limité dans le temps
Si le délai d'attente ne peut pas obtenir le verrou, il reviendra faux et n'attendra pas de façon permanente pour former une serrure morte.
Utilisez Lock.Trylock (temps de temps long, unité TimeUnit) pour implémenter des verrous limitables dans le temps, les paramètres étant le temps et les unités.
Permettez-moi de vous donner un exemple pour illustrer que le temps peut être limité:
Test de package; Importer java.util.concurrent.TimeUnit; import java.util.concurrent.locks.rentrantlock; le test de classe publique implémente Runnable {public static rentrantlock Lock = new ReentrantLock (); @Override public void run () {try {if (lock.trylock (5, timeunit.seconds)) {thread.sleep (6000); } else {System.out.println ("Get Lock a échoué"); }} catch (exception e) {} enfin {if (lock.isheldbyCurrentThread ()) {lock.unlock (); }}} public static void main (String [] args) {test t = new test (); Thread t1 = nouveau thread (t); Thread t2 = nouveau thread (t); t1.start (); t2.start (); }}Utilisez deux threads pour rivaliser pour un verrou. Lorsqu'un fil acquiert le verrou, dormez 6 secondes et que chaque fil essaie uniquement d'obtenir le verrou pendant 5 secondes.
Il doit donc y avoir un fil qui ne peut pas obtenir le verrou. Si vous ne pouvez pas l'obtenir, vous quitterez directement.
Sortir:
faire échouer le verrouillage
1.1.4. Verrouillage équitable
Comment utiliser:
Reentrantlock public (foire booléenne)
public static reentrantlock fairlock = new reentrantlock (true);
Les serrures en général sont injustes. Il n'est pas nécessairement possible que le fil qui arrive en premier puisse d'abord obtenir le verrou, mais le fil qui arrive plus tard obtiendra le verrou plus tard. Les serrures déloyales peuvent provoquer la faim.
Un verrouillage équitable signifie que cette serrure peut garantir que le fil passe en premier et obtient le verrou en premier. Bien que les écluses équitables ne provoqueront pas la faim, les performances des serrures équitables seront bien pires que celles des écluses non-fair.
1.2 Condition
La relation entre la condition et le reentrantlock est similaire à Synchronisé et Object.Wait () / Signal ()
La méthode Await () fera attendre le thread actuel et libérer le verrouillage actuel. Lorsque le signal () est utilisé dans d'autres threads ou la méthode Signalall (), le thread reprendra le verrou et continuera à s'exécuter. Ou lorsque le fil est interrompu, vous pouvez également sauter de l'attente. Ceci est très similaire à la méthode object.wait ().
La méthode AwaitUnterruply () est fondamentalement la même que la méthode Await (), mais elle n'attendra pas l'interruption de réponse pendant le processus. La méthode singal () est utilisée pour réveiller un fil en attente. La méthode relative singalall () réveillera tous les fils en attente. Ceci est très similaire à la méthode objct.notify ().
Je ne le présenterai pas en détail ici. Permettez-moi de vous donner un exemple pour illustrer:
Test de package; import java.util.concurrent.locks.condition; import java.util.concurrent.locks.rentrantlock; le test de classe publique implémente Runnable {public static reentrantlock Lock = new ReentrantLock (); Condition publique de condition statique = Lock.NewCondition (); @Override public void run () {try {lock.lock (); condition.Await (); System.out.println ("Thread se passe"); } catch (exception e) {e.printStackTrace (); } enfin {lock.unlock (); }} public static void main (String [] args) lève InterruptedException {test t = new test (); Thread thread = nouveau thread (t); thread.start (); Thread.Sleep (2000); lock.lock (); condition.signal (); lock.unlock (); }}L'exemple ci-dessus est très simple. Laissez un fil attendre et laissez le fil principal le réveiller. condition.Await () / signal ne peut être utilisé qu'après avoir obtenu le verrou.
1.3. semaphore
Pour les verrous, il s'exclut mutuellement. Cela signifie que tant que j'obtiens la serrure, personne ne peut le récupérer.
Pour le sémaphore, il permet à plusieurs threads de saisir la section critique en même temps. Il peut être considéré comme un verrou partagé, mais la limite partagée est limitée. Une fois la limite utilisée, d'autres threads qui n'ont pas obtenu la limite se bloqueront toujours en dehors de la zone critique. Lorsque le montant est 1, il est équivalent à verrouiller
Voici un exemple:
Test de package; Importer java.util.concurrent.executorService; import java.util.concurrent.executors; import java.util.concurrent.semaphore; le test de classe publique implémente Runnable {final sémaphore Semaphore = new Semaphore (5); @Override public void run () {try {semaphore.acquire (); Thread.Sleep (2000); System.out.println (thread.currentThread (). GetID () + "Done"); } catch (exception e) {e.printStackTrace (); } Enfin {Semaphore.release (); }} public static void main (String [] args) lève InterruptedException {ExecutorService ExecutorService = exécutor.NewFixEdThreadPool (20); Test final t = nouveau test (); pour (int i = 0; i <20; i ++) {EMMIRESSERVICE.SUMMIT (T); }}}Il y a un pool de threads avec 20 threads, et chaque fil va à la licence de Semaphore. Il n'y a que 5 licences pour le sémaphore. Après l'exécution, vous pouvez voir que 5 sont sortis en lots, les lots sont sortis.
Bien sûr, un fil peut également demander plusieurs licences à la fois
Le public Void Acquire (INT Permis) lance InterruptedException
1.4 readwritelock
ReadWriteLock est un verrou qui distingue les fonctions. La lecture et l'écriture sont deux fonctions différentes: la lecture en lecture n'est pas mutuellement exclusive, la lecture-écriture s'exclue mutuellement et Write-Write s'exclue mutuellement.
Cette conception augmente la concurrence et assure la sécurité des données.
Comment utiliser:
privé static reentRanTreadWriteLock readWriteLock = new reentRanTreadWriteLock ();
Lock statique privé readlock = readWriteLock.readlock ();
Lock statique privé WriteLock = readWriteLock.WriteLock ();
Pour des exemples détaillés, vous pouvez voir la mise en œuvre Java des problèmes de producteur et de consommation et de problèmes de lecteur et d'écrivain, et je ne l'élargirai pas ici.
1,5 compte à rebours
Un scénario typique pour un compte à rebours est un lancement de fusée. Avant le lancement de la fusée, afin de s'assurer que tout est infaillible, les inspections de divers équipements et instruments sont souvent effectuées. Le moteur ne peut s'enflammer qu'après la fin des inspections. Ce scénario est très adapté à CountdownLatch. Il peut faire en sorte que le fil d'allumage attende tous les threads de contrôle avant de l'exécuter avant de l'exécuter
Comment utiliser:
Static final CountdownLatch end = new CountdownLatch (10);
end.CountDown ();
end.Await ();
Diagramme schématique:
Un exemple simple:
Test de package; Importer java.util.concurrent.countDownLatch; Importer java.util.concurrent.executorService; Importer java.util.concurrent.executors; le test de classe publique implémente Runnable {static final CountDownLatch CountdownLatch = nouveau compte à rebours (10); Test final statique t = nouveau test (); @Override public void run () {try {thread.sleep (2000); System.out.println ("complet"); CountdownLatch.CountDown (); } catch (exception e) {e.printStackTrace (); }} public static void main (String [] args) lève InterruptedException {ExecutorService ExecutorService = exécutor.NewFixEdThreadPool (10); pour (int i = 0; i <10; i ++) {EMMIRESSERVICE.EXECUTE (T); } CountdownLatch.Await (); System.out.println ("end"); ExecutorService.shutdown (); }}Le thread principal doit attendre que les 10 threads s'exécutent avant de sortir "End".
1.6 Cyclique Barrier
Semblable à CountdownLatch, il attend également que certains threads se terminent avant de les exécuter. La différence avec le compte à rebours est que ce compteur peut être utilisé à plusieurs reprises. Par exemple, supposons que nous réglions le compteur sur 10. Ensuite, après avoir rassemblé le premier lot de 10 threads, le compteur reviendra à zéro, puis rassemblant le lot suivant de 10 threads
Comment utiliser:
Public CyclicBarrier (intons int, Runnable BarrierActure)
La barrière est l'action que le système effectuera lorsque le compteur comptera une fois.
attendre()
Diagramme schématique:
Voici un exemple:
Test de package; import java.util.concurrent.cyclicbarrier; le test de classe publique implémente Runnable {private String soldat; Cyclique cyclique final privé; Test public (String Soldier, cyclicbarrier cyclique) {this.soldier = soldat; this.cyclic = cyclique; } @Override public void run () {try {// attendez que tous les soldats arrivent cyclique.await (); Dowork (); // attendez que tous les soldats terminent leur travail cyclique.Await (); } catch (exception e) {// TODO Bloc de capture généré automatiquement e.printStackTrace (); }} private void Dowork () {// TODO Méthode générée automatique Stub Try {Thread.Sleep (3000); } catch (exception e) {// todo: greg exception} System.out.println (soldat + ": fait"); } Classe statique publique BarrierRun implémente Runnable {booléen drapeau; int n; Public BarrierRun (drapeau booléen, int n) {super (); this.flag = drapeau; this.n = n; } @Override public void run () {if (flag) {System.out.println (n + "tâche complète"); } else {System.out.println (n + "set complétion"); Flag = true; }}} public static void main (String [] args) {final int n = 10; Thread [] threads = nouveau thread [n]; booléen drapeau = false; Cyclicbarrier Barrier = new CyclicBarrier (n, new BarrierRun (drapeau, n)); System.out.println ("set"); pour (int i = 0; i <n; i ++) {System.out.println (i + "rapport"); threads [i] = nouveau thread (nouveau test ("soldat" + i, barrière)); Threads [i] .start (); }}}Résultat d'impression:
rassembler
0 rapports
1 rapport
2 rapports
3 rapports
4 rapports
5 rapports
6 rapports
7 rapports
8 rapports
9 rapports
10 sets Soldier complet 5: Terminé
Soldat 7: Terminé
Soldat 8: fait
Soldat 3: fait
Soldat 4: fait
Soldat 1: fait
Soldat 6: Terminé
Soldat 2: fait
Soldat 0: fait
Soldat 9: fait
10 tâches effectuées
1.7 Locksupport
Fournir un blocage de fil primitif
Semblable à la suspension
Locksupport.park ();
Locksupport.unpark (T1);
Comparé à la suspension, il n'est pas facile de provoquer un gel des fils.
L'idée de Locksupport est quelque peu similaire au sémaphore. Il a une licence interne. Il enlève cette licence lorsqu'il est stationné et demande cette licence lorsqu'il est désintéressé. Par conséquent, si un désarme est avant le parc, la congélation de fil ne se produira pas.
Le code suivant est l'exemple de code de suspension dans la [haute concurrence Java 2] Foundation multi-lancement. Une impasse se produit lors de l'utilisation de la suspension.
Test de package; import java.util.concurrent.locks.lockSupport; classe publique test {objet statique u = nouveau objet (); TestsUsPendThread statique T1 = nouveau TestSUsPendThread ("T1"); TestsUsPendThread statique T2 = nouveau TestSUsPendThread ("T2"); classe statique publique TestSUsPendThread étend Thread {public TestSUsPendThread (String Name) {setName (name); } @Override public void run () {synchronisé (u) {System.out.println ("dans" + getName ()); //Thread.currentThread (). Suspende (); Locksupport.park (); }}} public static void main (String [] args) lève InterruptedException {t1.start (); Thread.Sleep (100); t2.start (); // t1.resume (); // t2.Resume (); Locksupport.unpark (T1); Locksupport.unpark (T2); t1.join (); t2.join (); }}Cependant, l'utilisation de Locksupport ne provoquera pas de blocages.
en outre
Park () peut répondre aux interruptions, mais ne lance pas des exceptions. Le résultat de la réponse d'interruption est que le retour de la fonction Park () peut obtenir l'indicateur d'interruption à partir de thread.interrupted ().
Il existe de nombreux endroits dans JDK qui utilisent le parc, bien sûr, la mise en œuvre de Locksupport est également mise en œuvre à l'aide de danget.Park ().
Park Void statique public () {
disafe.park (false, 0l);
}
1.8 Implémentation de ReentrantLock
Présentez la mise en œuvre de ReentrantLock. La mise en œuvre de Reentrantlock est principalement composée de trois parties:
La classe parent de ReentrantLock aura une variable d'état pour représenter l'état synchrone.
/ ** * l'état de synchronisation. * / État int privé volatile;
Définissez l'état pour acquérir l'opération CAS via CAS. S'il est réglé sur 1, le support de verrouillage est donné au fil actuel
VOID LOCK FINAL () {if (comparabledSetState (0, 1)) setExclusiveOwnerThread (Thread.CurrentThread ()); sinon acquérir (1); }Si le verrouillage n'est pas réussi, une application sera faite
public final void Acquire (int arg) {if (! tryacquire (arg) && acquirequeUeUe (addWaitter (node.exclusive), arg)) selfinterrupt (); }Tout d'abord, essayez Tryacquire après l'application, car un autre fil peut avoir libéré le verrou.
Si vous n'avez toujours pas postulé pour le verrou, ajoutez le serveur, ce qui signifie vous ajouter à la file d'attente d'attente
nœud privé addWaitter (mode nœud) {nœud node = new nœud (thread.currentThread (), mode); // Essayez le chemin rapide de ENQ; Sauvegarde vers Full enq sur le nœud de défaillance Pred = Tail; if (pred! = null) {node.prev = pred; if (comparabledSettail (pred, node)) {pred.next = node; Node de retour; }} enq (nœud); Node de retour; }Au cours de cette période, il y aura de nombreuses tentatives pour postuler pour une serrure, et si vous ne pouvez toujours pas postuler, vous serez raccroché.
Final privé boolean ParkandCheckInterrupt () {lockSupport.park (this); return thread.interrupted (); }De même, si le verrou est libéré, puis Undem n'est pas discuté en détail ici.
2. Répétreur simultané et analyse de code source typique
2.1 Concurrenthashmap
Nous savons que HashMap n'est pas un conteneur en file d'attente. La façon la plus simple de faire du thread hashmap est d'utiliser
Collections.SynchronizedMap, c'est un wrapper pour hashmap
Map statique publique m = collection.SynchronizedMap (new HashMap ());
De même, pour la liste, SET fournit également des méthodes similaires.
Cependant, cette méthode ne convient que pour les cas où le montant de la concurrence est relativement faible.
Jetons un coup d'œil à l'implémentation de SynchronizedMap
Carte finale privée <k, v> m; // Backing Map Final Object Mutex; // Objet sur lequel synchroniser synchronisé (map <k, v> m) {if (m == null) lancez new nullpointerException (); this.m = m; mutex = this; } Synchronisémap (map <k, v> m, objet mutex) {this.m = m; this.mutex = mutex; } public int size () {synchronisé (mutex) {return m.size ();}} public boolean isEmpty () {synchronisé (mutex) {return m.isempty ();}} public boolean contientKey (objet) {synchronisé (mutex) {return mContransKey (key); otsy} public booolean contenant-values) (key); otsy} public contes {synchronisé (mutex) {return m.ConTainsValue (valeur);}} public v get (clé d'objet) {synchronisé (mutex) {return m.get (key);}} public v put (k key, v valeur) {synchronisé (Mutex) {return M.put (keate, value);}} public v demove M.Remove (key);}} public void putall (map <? étend k ,? étend v> map) {synchronisé (mutex) {m.putall (map);}} public void clear () {synchronisé (mutex) {M.Clear ();}}Il enveloppe le hashmap à l'intérieur puis synchronisé chaque opération du hashmap.
Étant donné que chaque méthode acquiert le même serrure (mutex), cela signifie que les opérations telles que le put et la suppression s'excluent mutuellement, réduisant considérablement la quantité de concurrence.
Voyons comment concurrenthashmap est mis en œuvre
public v put (k key, v valeur) {segment <k, v> s; if (value == null) lance un nouveau nullpointerException (); int hash = hash (key); int j = (hash >>> segmentshift) & segmentMask; if ((s = (segment <k, v>) unsetafe.getObject // non volatile; reheck (segments, (j << sshift) + sbase)) == null) // dans assurresegment s = assuresegment (j); return s.put (clé, hachage, valeur, false); }Il y a un segment de segment à l'intérieur du simulachashmap, qui divise le grand hashmap en plusieurs segments (petit hashmap), puis hachent les données sur chaque segment. De cette façon, les opérations de hachage de plusieurs threads sur différents segments doivent être en file d'attente, vous n'avez donc qu'à synchroniser les threads sur le même segment, ce qui réalise la séparation des serrures et augmente considérablement la concurrence.
Il sera plus gênant lors de l'utilisation de ConcurrentHashMap.Size car il doit compter la somme de données de chaque segment. Pour le moment, vous devez ajouter des verrous à chaque segment, puis faire des statistiques de données. Il s'agit d'un petit inconvénient après la séparation du verrou, mais la méthode de taille ne doit pas être appelée à haute fréquence.
En termes de mise en œuvre, nous n'utilisons pas Synchronisé et Lock.Lock mais essayez autant que possible. Dans le même temps, nous avons également fait quelques optimisations dans la mise en œuvre de HashMap. Je ne le mentionnerai pas ici.
2.2 BlockingQueue
BlockingQueue n'est pas un conteneur haute performance. Mais c'est un très bon conteneur pour partager des données. Il s'agit d'une implémentation typique des producteurs et des consommateurs.
Diagramme schématique:
Pour plus de détails, vous pouvez consulter la mise en œuvre Java des problèmes de producteur et de consommation et des problèmes de lecteur et d'écrivain.