En la base de múltiples subprocesos [de alta concurrencia Java II], inicialmente hemos mencionado las operaciones básicas de sincronización de hilos. Lo que queremos mencionar esta vez es la herramienta de control de sincronización en el paquete concurrente.
1. Uso de varias herramientas de control de sincronización
1.1 Reentrantlock
Reentrantlock se siente como una versión mejorada de sincronizado. La característica de sincronizado es que es simple de usar y todo se deja al JVM para su procesamiento, pero sus funciones son relativamente débiles. Antes de JDK1.5, el rendimiento de Reentrantlock fue mejor que sincronizado. Debido a la optimización del JVM, el rendimiento de los dos en la versión JDK actual es comparable. Si se trata de una implementación simple, no use deliberadamente ReentrantLock.
En comparación con el sincronizado, Reentrantlock es más funcionalmente rico, y tiene las características del tiempo reentrante, interrumpible, limitado y de bloqueo justo.
Primero, usemos un ejemplo para ilustrar el uso inicial de Reentrantlock:
Prueba de paquete; import java.util.concurrent.locks.reentrantlock; la prueba de clase pública implementa runnable {public static reentrantlock Lock = new ReentrantLock (); Público estático int i = 0; @Override public void run () {for (int j = 0; j <10000000; j ++) {lock.lock (); intente {i ++; } Finalmente {Lock.unlock (); }}} public static void main (string [] args) lanza interruptedException {test test = new test (); Hilo t1 = nuevo hilo (prueba); Hilo t2 = nuevo hilo (prueba); t1.start (); t2.start (); t1.Join (); t2.join (); System.out.println (i); }}Hay dos hilos que realizan operaciones ++ en i. Para garantizar la seguridad de los subprocesos, se utiliza un reentrantlock. A partir del uso, podemos ver que en comparación con sincronizado, Reentrantlock es un poco más complicado. Debido a que la operación de desbloqueo debe realizarse finalmente, si no se desbloquea finalmente, es posible que el código tenga una excepción y el bloqueo no se lance, y el JVM libera sincronizado.
Entonces, ¿cuáles son las excelentes características de Reentrantlock?
1.1.1 reingreso
Se puede ingresar repetidamente un hilo único, pero debe salir repetidamente
Lock.lock (); Lock.lock (); intente {i ++; } Finalmente {Lock.unlock (); Lock.unlock ();}Dado que Reentrantlock es un bloqueo reentrante, puede obtener el mismo bloqueo repetidamente, que tiene un mostrador de adquisición relacionado con el bloqueo. Si un hilo propietario de la cerradura vuelve a obtener el bloqueo, el mostrador de adquisición aumenta en 1, y el bloqueo debe liberarse dos veces para obtener la liberación real (bloqueo reentrante). Esto imita la semántica de sincronizado; Si el hilo ingresa un bloque sincronizado protegido por el monitor que el hilo ya tiene, el hilo puede continuar. Cuando el hilo sale del segundo (o posterior) bloque sincronizado, no se libera el bloqueo. El bloqueo solo se libera cuando el hilo sale del primer bloque sincronizado protegido por el monitor que ingresa.
Public Class Child extiende el padre Implementa Runnable {Final Static Child = New Child (); // para garantizar que bloquee un único público void público void (string [] args) {para (int i = 0; i <50; i ++) {nuevo hilo (niño) .Start (); }} public sincronizado void dosomething () {System.out.println ("1Child.DoSomething ()"); doanththing (); // llamar a otros métodos sincronizados en su propia clase} privado sincronizado vacío doanothing () {super.dosomthing (); // llame al método sincronizado de la clase principal System.out.println ("3child.doanothing ()"); } @Override public void run () {child.dosomthething (); }} Class Father {public sincronizado void dosomething () {System.out.println ("2father.Dosomthing ()"); }}Podemos ver que un hilo ingresa un método sincronizado diferente y no liberará los bloqueos obtenidos antes. Entonces la salida sigue siendo secuencial. Por lo tanto, sincronizado también es un bloqueo reentrante
Producción:
1Child.DoSomething ()
2father.DoSomething ()
3child.doanotherththing ()
1Child.DoSomething ()
2father.DoSomething ()
3child.doanotherththing ()
1Child.DoSomething ()
2father.DoSomething ()
3child.doanotherththing ()
...
1.1.2. Interrupciones
A diferencia de sincronizado, Reentrantlock responde a las interrupciones. Vista de conocimiento relacionada con la interrupción [Alta concurrencia Java 2] Conceptos básicos de lectura múltiple
Ordinary Lock.Lock () no puede responder a las interrupciones, Lock.LockInterruption () puede responder a las interrupciones.
Simulamos una escena de punto muerto y luego usamos interrupciones para lidiar con el punto muerto
Prueba de paquete; import java.lang.management.ManagementFactory; import java.lang.management.threadinfo; import java.lang.management.threadmxbean; import java.util.concurrent.locks.reentrantlock; implementaciones de clase pública ejecutables {pública estaticación reentrantlock de reentllock = nuevo reentRantlock ();); Public static reentrantlock Lock2 = new ReentrantLock (); int bloqueo; Test Public (int Lock) {this.lock = Lock; } @Override public void run () {try {if (Lock == 1) {Lock1.LockInterruptable (); intente {thread.sleep (500); } catch (Exception e) {// tODO: manejar excepción} list22.lockinterruptable (); } else {Lock2.LockInterruptable (); intente {thread.sleep (500); } Catch (Exception e) {// tODO: manejar excepción} Lock1.lockInterruptable (); }} Catch (Exception e) {// tODO: manejar excepción} finalmente {if (lock1.isheldByCurrentThread ()) {Lock1.unlock (); } if (lock2.isheldByCurrentThread ()) {Lock2.unlock (); } System.out.println (thread.currentThread (). GetId () + ": salida de hilo"); }} public static void main (string [] args) lanza interruptedException {test t1 = new test (1); Prueba t2 = nueva prueba (2); Thread Thread1 = nuevo hilo (T1); Thread Thread2 = nuevo hilo (T2); Thread1.Start (); thread2.start (); Thread.sleep (1000); //Deadlockchecker.check (); } Class estática DeadLockChecker {private final static threadmxbean mbean = gestorFactory .getThreadMxBean (); Final static runnable DeadLockChecker = new Runnable () {@Override public void run () {// toDO Auto-Generated Stub while (true) {long [] DeadlockedThreadIDS = mBean.findDeadLockedThreads (); if (DeadLockedThreadids! = NULL) {threadInfo [] threinfos = mBean.getThreadInfo (DeadLockedThreadIDS); for (Thread t: Thread.getAllStackTraces (). KeySet ()) {for (int i = 0; i <threadinfos.length; i ++) {if (t.getid () == threadinfos [i] .getThreadID ()) {t.interrupt (); }}}}} try {thread.sleep (5000); } catch (Exception e) {// tODO: manejar excepción}}}}}; public static void check () {hilo t = nuevo hilo (Deadlockchecker); T.SetDaemon (verdadero); t.Start (); }}}El código anterior puede causar puntos muertos, el hilo 1 obtiene bloqueo1, hilo 2 obtiene bloqueo2 y luego el uno al otro quieren obtener las cerraduras del otro.
Usamos Jstack para ver la situación después de ejecutar el código anterior
De hecho, se descubrió un punto muerto.
The DeadLockChecker.Check (); El método se utiliza para detectar puntos muertos e interrumpir el hilo de punto muerto. Después de la interrupción, el hilo sale normalmente.
1.1.3. De tiempo limitado
Si el tiempo de espera no puede obtener el bloqueo, volverá falso y no esperará permanentemente para formar un bloqueo muerto.
Use Lock.trylock (tiempo de espera largo, unidad de tiempo de tiempo) para implementar cerraduras limitables en el tiempo, con los parámetros de tiempo y unidades.
Déjame darte un ejemplo para ilustrar que el tiempo puede ser limitado:
Prueba de paquete; import java.util.concurrent.timeunit; import java.util.concurrent.locks.reentrantlock; la prueba de clase pública implementa runnable {public static reentrantlock bloqueo = new ReentrantLock (); @Override public void run () {try {if (Lock.trylock (5, TimeUnit.Seconds)) {Thread.sleep (6000); } else {System.out.println ("Get Lock Falling"); }} Catch (Exception e) {} finalmente {if (lock.isheldByCurrentThread ()) {Lock.unlock (); }}} public static void main (string [] args) {test t = new test (); Hilo t1 = nuevo hilo (t); Hilo t2 = nuevo hilo (t); t1.start (); t2.start (); }}Use dos hilos para competir por un bloqueo. Cuando un hilo adquiere el bloqueo, duerma 6 segundos, y cada hilo solo trata de obtener el bloqueo durante 5 segundos.
Entonces debe haber un hilo que no pueda obtener el bloqueo. Si no puede obtenerlo, saldrá directamente.
Producción:
Falló el bloqueo
1.1.4. Mechón justo
Cómo usar:
Reentrantlock público (feria booleana)
Public static reentrantlock fairlock = nuevo reentrantlock (verdadero);
Las cerraduras en general son injustas. No es necesariamente posible que el hilo que viene primero pueda obtener el bloqueo primero, pero el hilo que viene más tarde obtendrá el bloqueo más tarde. Las cerraduras injustas pueden causar hambre.
Un bloqueo justo significa que este bloqueo puede garantizar que el hilo llegue primero y obtenga el bloqueo primero. Aunque las cerraduras justas no causarán hambre, el rendimiento de las cerraduras justas será mucho peor que la de los bloqueos no ferro.
1.2 condición
La relación entre condición y reentrantlock es similar a sincronizado y objeto.wait ()/señal ()
El método Await () hará que el hilo actual espere y liberará el bloqueo actual. Cuando la señal () se usa en otras hilos o en el método SignalAll (), el hilo recuperará el bloqueo y continuará ejecutando. O cuando se interrumpe el hilo, también puedes saltar de la espera. Esto es muy similar al método Object.Wait ().
El método AwaitUnInterrumty () es básicamente el mismo que el método Await (), pero no esperará la interrupción de la respuesta durante el proceso. El método Singal () se usa para despertar un hilo que espera. El método relativo Singalall () despertará todos los hilos que esperan. Esto es muy similar al método objct.notify ().
No lo presentaré en detalle aquí. Déjame darte un ejemplo para ilustrar:
Prueba de paquete; import java.util.concurrent.locks.condition; import java.util.concurrent.locks.reentrantlock; la prueba de clase pública implementa runnable {public static reentrantlock bloqueo = new ReentrantLock (); condición estática pública condición = bloqueo.newcondition (); @Override public void run () {try {Lock.lock (); condición.await (); System.out.println ("El hilo está sucediendo"); } catch (Exception e) {E.PrintStackTrace (); } Finalmente {Lock.unlock (); }} public static void main (string [] args) lanza interruptedException {test t = new test (); Hilo de hilo = nuevo hilo (t); Thread.Start (); Thread.sleep (2000); Lock.lock (); condición.signal (); Lock.unlock (); }}El ejemplo anterior es muy simple. Deje que un hilo espere y deje que el hilo principal lo despierte. condición.await ()/Señal solo se puede usar después de obtener el bloqueo.
1.3.semaphore
Para las cerraduras, es mutuamente excluyente. Significa que mientras obtenga la cerradura, nadie puede volver a conseguirlo.
Para Semaphore, permite que múltiples subprocesos ingresen a la sección crítica al mismo tiempo. Se puede considerar como un bloqueo compartido, pero el límite compartido es limitado. Después de que se usa el límite, otros hilos que no han obtenido el límite aún se bloquearán fuera del área crítica. Cuando la cantidad es 1, es equivalente a bloquear
Aquí hay un ejemplo:
Prueba de paquete; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.semaphore; implementos de prueba de clase pública ejecutables {final semáforo semáforo = nuevo semafore (5); @Override public void run () {try {semaphore.acquire (); Thread.sleep (2000); System.out.println (Thread.CurrentThread (). GetId () + "Done"); } catch (Exception e) {E.PrintStackTrace (); } Finalmente {Semaphore.Release (); }} public static void main (string [] args) lanza interruptedException {EjecutorService EjecutorService = Ejecutors.NewFixedThreadPool (20); prueba final t = nueva prueba (); para (int i = 0; i <20; i ++) {ExecutorService.submit (t); }}}Hay un grupo de hilos con 20 hilos, y cada hilo va a la licencia de Semaphore. Solo hay 5 licencias para semáforo. Después de ejecutar, puede ver que 5 son salidas en lotes, se emiten lotes.
Por supuesto, un hilo también puede solicitar múltiples licencias a la vez
Public void adquirir (int permite) lanza interruptedexception
1.4 ReadWriteLock
ReadWriteLock es un bloqueo que distingue funciones. La lectura y la escritura son dos funciones diferentes: la lectura de lectura no es mutuamente excluyente, la lectura-escritura es mutuamente excluyente y la escritura de escritura es mutuamente excluyente.
Este diseño aumenta la concurrencia y garantiza la seguridad de los datos.
Cómo usar:
Private static reentRantReadWriteLock readWriteLock = new ReEntRantReadWriteLock ();
bloqueo estático privado readlock = readwriteLock.readlock ();
bloqueo estático privado writeLock = readwriteLock.writeLock ();
Para ejemplos detallados, puede ver la implementación de Java de problemas de productores y consumidores y problemas de lector y escritor, y no la expandiré aquí.
1.5 CountdownLatch
Un escenario típico para un temporizador de cuenta regresiva es un lanzamiento de cohetes. Antes de que se lance el cohete, para garantizar que todo sea infalible, a menudo se realizan inspecciones de varios equipos e instrumentos. El motor solo puede encenderse después de que se completen todas las inspecciones. Este escenario es muy adecuado para CountdownLatch. Puede hacer que el hilo de encendido espere a que se complete todos los hilos de verificación antes de ejecutarlo
Cómo usar:
End de CountdownLatch final estático = New CountdownLatch (10);
end.countdown ();
end.await ();
Diagrama esquemático:
Un ejemplo simple:
Prueba de paquete; import java.util.concurrent.countdownlatch; import java.util.concurrent.executorservice; import java.util.concurrent.executors; public class Test implementa runnable {estática final Final CountdownLatch CountdownLatch = New CountdownLatch (10); prueba final estática t = nueva prueba (); @Override public void run () {try {thread.sleep (2000); System.out.println ("Complete"); CountdownLatch.CountDown (); } catch (Exception e) {E.PrintStackTrace (); }} public static void main (string [] args) lanza interruptedException {EjecutorService EjecutorService = Ejecutors.NewFixedThreadPool (10); para (int i = 0; i <10; i ++) {ExecutorService.execute (t); } CountdownLatch.AWait (); System.out.println ("End"); EjecutorService.shutdown (); }}El hilo principal debe esperar a que se ejecute los 10 subprocesos antes de salir "Fin".
1.6 Cyclicbarrier
Similar a CountdownLatch, también está esperando que se completen algunos hilos antes de ejecutarlos. La diferencia con CountdownLatch es que este contador se puede usar repetidamente. Por ejemplo, supongamos que establecemos el contador en 10. Luego, después de reunir el primer lote de 10 hilos, el contador volverá a cero y luego reunir el siguiente lote de 10 hilos
Cómo usar:
Cyclicbarrier público (int Parties, Runnable BarrierAction)
La barrera es la acción que el sistema realizará cuando el contador cuenta una vez.
esperar()
Diagrama esquemático:
Aquí hay un ejemplo:
Prueba de paquete; import java.util.concurrent.cyclicbarrier; public class Test implementa Runnable {private String Soldier; Cyclicbarrier final privado Cyclic; Prueba pública (String Soldier, CyclicBarrier Cyclic) {this.soldier = Soldier; this.cclic = cyclic; } @Override public void run () {try {// espera a que todos los soldados lleguen cyclic.await (); Dowork (); // espera a que todos los soldados completen su trabajo cyclic.await (); } Catch (Exception e) {// TODO Auto Generado Bloque E.PrintStackTrace (); }} private void dowork () {// TODO Auto Generado Método STUB Try {Thread.sleep (3000); } catch (Exception e) {// tODO: manejar excepción} system.out.println (Soldier + ": Done"); } Public static Class BarrierRun implementa Runnable {Boolean Flag; int n; Public Barrierrun (Boolean Flag, int n) {super (); this.flag = flag; this.n = n; } @Override public void run () {if (flag) {system.out.println (n + "finalización de la tarea"); } else {system.out.println (n + "set finalation"); bandera = verdadero; }}} public static void main (string [] args) {final int n = 10; Hilo [] hilos = nuevo hilo [n]; bandera booleana = falso; CyclicBarrier Barrier = New CyclicBarrier (N, New BarrierRun (Flag, N)); System.out.println ("set"); for (int i = 0; i <n; i ++) {system.out.println (i+"informe"); hilos [i] = nuevo hilo (nueva prueba ("soldado" + i, barrera)); hilos [i] .Start (); }}}Resultado de impresión:
recolectar
0 informes
1 informe
2 informes
3 informes
4 informes
5 informes
6 informes
7 informes
8 informes
9 informes
10 sets Soldado completo 5: Hecho
Soldado 7: Hecho
Soldado 8: Hecho
Soldado 3: Hecho
Soldado 4: Hecho
Soldado 1: Hecho
Soldado 6: Hecho
Soldado 2: Hecho
Soldado 0: Hecho
Soldado 9: Hecho
10 tareas completadas
1.7 Locksupport
Proporcionar bloqueo de hilo primitivo
Similar a suspender
Locksupport.park ();
Locksupport.unpark (T1);
En comparación con la suspensión, no es fácil causar la congelación de subprocesos.
La idea de Locksupport es algo similar a Semaphore. Tiene una licencia interna. Le quita esta licencia cuando se estaciona y solicita esta licencia cuando no está a la altura. Por lo tanto, si no está antes del parque, no se producirá congelación de hilos.
El siguiente código es el código de muestra de suspensión en la base de múltiples subprocesos [Alta concurrencia Java 2]. Un punto muerto ocurre al usar suspensión.
prueba de paquete; import java.util.concurrent.locks.locksupport; prueba de clase pública {objeto estático u = nuevo objeto (); static testSuspendThread t1 = new testSuspendThread ("t1"); static testSuspendThread t2 = new testSuspendThread ("t2"); public static class testSuspendThread extiende el hilo {public testSuspendThread (name de cadena) {setName (name); } @Override public void run () {SynChronized (U) {System.out.println ("en" + getName ()); //Thread.currentThread (). Suspend (); Locksupport.park (); }}} public static void main (string [] args) lanza interruptedException {t1.start (); Hilt.sleep (100); t2.start (); // t1.resume (); // t2.resume (); Locksupport.unpark (T1); Locksupport.unpark (T2); t1.Join (); t2.join (); }}Sin embargo, el uso de Locksupport no causará puntos muertos.
además
Park () puede responder a las interrupciones, pero no arroja excepciones. El resultado de la respuesta de interrupción es que el retorno de la función Park () puede obtener el indicador de interrupción de Thread.interrupted ().
Hay muchos lugares en JDK que usan Park, por supuesto, la implementación de Locksupport también se implementa utilizando Unsafe.Park ().
Public static void park () {
unsafe.park (falso, 0l);
}
1.8 Implementación de reentrantlock
Presentemos la implementación de Reentrantlock. La implementación de Reentrantlock se compone principalmente de tres partes:
La clase principal de Reentrantlock tendrá una variable de estado para representar el estado sincrónico.
/*** El estado de sincronización. */ Estado privado volátiles int;
Establezca el estado para adquirir el bloqueo a través de la operación CAS. Si se establece en 1, el soporte de bloqueo se da al hilo actual
Final void Lock () {if (compareSetState (0, 1)) setExClusiveSownerThread (Thread.CurrentThread ()); más adquirir (1); }Si el bloqueo no tiene éxito, se realizará una aplicación
Public Final void adquirir (int arg) {if (! tryacquire (arg) && adquirequeuel (addwaiter (node.exclusive), arg)) autointerrupt (); }Primero, intente TryAcquire después de aplicar, porque otro hilo puede haber lanzado el bloqueo.
Si aún no ha solicitado el bloqueo, agregue camarero, lo que significa agregarle a la cola de espera
Nodo privado AddWaIter (modo de nodo) {nodo nodo = nuevo nodo (thread.currentThread (), modo); // prueba la ruta rápida de ENQ; Copia de seguridad de ENQ completo en el nodo de falla Pred = cola; if (pred! = null) {node.prev = pred; if (compareAndsettail (pred, nodo)) {pred.next = node; nodo de retorno; }} enq (nodo); nodo de retorno; }Durante este período, habrá muchos intentos de solicitar un bloqueo, y si aún no puede solicitar, se colgará.
Private Final Boolean ParkandCheckInterrupt () {Locksupport.park (this); return thread.interrupted (); }Del mismo modo, si el bloqueo se lanza y luego no se discute en detalle aquí.
2. Contenedor concurrente y análisis típico del código fuente
2.1 Concurrenthashmap
Sabemos que hashmap no es un contenedor a prueba de subprocesos. La forma más fácil de hacer que el hilo de hashmap sea usar
Colección.synchronizedmap, es un envoltorio para el hashmap
Mapa estático público M = colección.synchronizedMap (new Hashmap ());
Del mismo modo, para la lista, SET también proporciona métodos similares.
Sin embargo, este método solo es adecuado para los casos en que la cantidad de concurrencia es relativamente pequeña.
Echemos un vistazo a la implementación de SynchronizedMap
Mapa final privado <k, v> m; // Mapa de respaldo Objeto final mutex; // Objeto sobre el cual sincronizar sincronizedMap (map <k, v> m) {if (m == null) tirar nueva nullpointerException (); this.m = m; mutex = esto; } Sincronizadomap (map <k, v> m, objeto mutex) {this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) { return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue(Object value) { sincronizado (mutex) {return m.containsValue (valor);}} public v get (clave de objeto) {sincronizado (mutex) {return m.get (key);}} public v put (k key, v valor) {sincronizado (mutex) {return M.put (key, valor);}} public v eliminación (objeto) {sincronizado (mutex) {return m.put. (key, value);}}} public V Eliminar (Object Key) {Synchonized (Mutex) {return M. M.Remove (Key);}} public void putall (map <? Extends k, "extiende v> map) {sincronizado (mutex) {m.putall (map);}} public void clear () {sincronizado (Mutex) {m.cLear ();}}}}}}Envuelve el hashmap en el interior y luego sincronizó cada operación del hashmap.
Dado que cada método adquiere el mismo bloqueo (mutex), esto significa que las operaciones como Put and Retwin son mutuamente excluyentes, reduciendo en gran medida la cantidad de concurrencia.
Veamos cómo se implementa concurrenthashmap
public v put (k key, v valor) {segmento <k, v> s; if (value == null) tire nuevo nullpointerException (); int hash = hash (clave); int j = (hash >>> desplazamiento de segmento) y masas de segmento; if ((s = (segmento <k, v>) unsafe.getObject // no volátil; vuelve a verificar (segmentos, (j << sshift) + sbase)) == null) // en el gemido s = profundo (j); return s.put (clave, hash, valor, falso); }Hay un segmento de segmento dentro del ConcurrentHashmap, que divide el gran hashmap en varios segmentos (hashmap pequeño), y luego hash los datos en cada segmento. De esta manera, las operaciones hash de múltiples hilos en diferentes segmentos deben ser seguras de hilo, por lo que solo necesita sincronizar los hilos en el mismo segmento, lo que realiza la separación de los bloqueos y aumenta en gran medida la concurrencia.
Será más problemático cuando se use concurrenthashmap.size porque necesita contar la suma de datos de cada segmento. En este momento, debe agregar bloqueos a cada segmento y luego hacer estadísticas de datos. Esta es una pequeña desventaja después de separar el bloqueo, pero el método de tamaño no debe llamarse a alta frecuencia.
En términos de implementación, no usamos sincronizado y bloqueo. Al mismo tiempo, también hemos hecho algunas optimizaciones en la implementación de HashMap. No lo mencionaré aquí.
2.2 Bloquingqueue
Bloquingqueue no es un contenedor de alto rendimiento. Pero es un muy buen contenedor para compartir datos. Es una implementación típica de productores y consumidores.
Diagrama esquemático:
Para más detalles, puede consultar la implementación de Java de problemas de productor y consumidor y problemas de lector y escritor.