1. Uso básico de piscinas de hilos
1.1. ¿Por qué necesitas una piscina de hilos?
En los negocios diarios, si queremos usar múltiples subprocesos, crearemos hilos antes de que comience el negocio y destruiremos hilos después de que termine el negocio. Sin embargo, para los negocios, la creación y destrucción de los hilos no tienen nada que ver con el negocio en sí, y solo se preocupa por las tareas realizadas por el hilo. Por lo tanto, espero usar tantas CPU como sea posible para realizar tareas, en lugar de crear y destruir hilos que no están relacionados con los negocios. El grupo de hilo resuelve este problema. La función del grupo de hilos es reutilizar hilos.
1.2. ¿Qué apoyo nos proporciona JDK?
Los diagramas de clase relacionados en JDK se muestran en la figura anterior.
Varias categorías especiales para mencionar.
La clase invocable es similar a la clase Runable, pero la diferencia es que Callable tiene un valor de retorno.
ThreadPoolExecutor es una implementación importante de los grupos de hilos.
Los ejecutores son una clase de fábrica.
1.3. Uso de piscinas de hilos
1.3.1. Tipos de piscinas de hilos
public static EjecutorService NewfixedThreadPool (int nthreads) {return New ThreadPoolexecutor (nthreads, nthreads, 0l, timeUnit.milliseConds, New LinkedBlowingqueue <runnable> ());} public static staticService NewshreadexeCutor () ThreadPoolExecutor (1, 1, 0L, TimeUnit.MillisEconds, New LinkedBlockingqueue <Runnable> ()));} public static EjecutorService NewCachedThreadPool () {return New ThreadPoolExeCutor (0, Integer.Max_Value, 60l, TimeUnit.sEseConds, New Synchronqueueeeee <RuneReue;Desde una perspectiva del método, es obvio que FixedThreadPool, SinglethreadExecutor y CachedThreadPool son diferentes instancias de ThreadPoolExecutor, pero los parámetros son diferentes.
Public ThreadPoolExeCutor (int corePoolSize, int MaximumummoolSize, Long KeepAlivEtime, TimeUnit Unit, Bloquingqueue <Runnable> WorkQuqueue) {this (corepoolSize, MaximumumupoolSize, KeepAlivetime, Unidad, Workqueue, ejecutores.defaulthreadory (), defaulthandler);} Describamos brevemente el significado de los parámetros en el constructor ThreadPoolExecutor.
De esta manera, al observar el FixedThreadPool mencionado anteriormente, el número de núcleos y el número máximo de subprocesos es el mismo, por lo que los hilos no serán creados y destruidos durante el trabajo. Cuando el número de tareas es grande y los hilos en el grupo de subprocesos no se pueden satisfacer, la tarea se guardará en LinkedBlockingqueue, y el tamaño de LinkedBlokingqueue es entero.max_value. Esto significa que la adición continua de tareas hará que la memoria consuma cada vez más.
Cachedthreadpool es diferente. Su número de subprocesamiento central es 0, el número máximo de almacenamiento es entero.max_value, y su cola de bloqueo es Synchronousqueue, que es una cola especial, y su tamaño es 0. Dado que el número de hilos centrales es 0, es necesario agregar la tarea a Synchronousqueue. Esta cola solo puede tener éxito cuando un hilo agrega datos y otro hilo obtiene datos de ella. Agregar datos a esta cola solo devolverá una falla. Cuando la devolución falla, el grupo de subprocesos comienza a expandir el hilo, por lo que el número de hilos en CachedThreadpool no se soluciona. Cuando el hilo no se usa para 60, el hilo se destruye.
1.4. Pequeños ejemplos de uso de la piscina de hilos
1.4.1. Piscina de hilos simple
import java.util.concurrent.executorservice; import java.util.concurrent.executors; public class ThreadpoOldemo {public static class myTask implements runnable {@Override public void run () {system.out.println (System.CurrentTimemillis () + "Thread Id:" + Thread.currentThrurreRt ().).).).).).).).).). intente {Thread.sleep (1000); } catch (Exception e) {E.PrintStackTrace (); }}} public static void main (string [] args) {myTask myTask = new myTask (); EjecutorService es = ejecutors.newFixedThreadPool (5); para (int i = 0; i <10; i ++) {es.submit (myTask); }}} Dado que se usa NewFixedThreadPool (5), pero se inician 10 hilos, 5 se ejecutan a la vez, y es obvio que se ve la reutilización de hilos. Threadid se repite, es decir, las primeras 5 tareas y las últimas 5 tareas se ejecutan por el mismo lote de hilos.
Lo que se usa aquí
es.submit (mytask);
También hay una forma de enviar:
es.Execute (MyTask);
La diferencia es que Subt devolverá un objeto futuro, que se introducirá más adelante.
1.4.2.ScheduledThreadPool
import java.util.concurrent.executors; import java.util.concurrent.scheduledExecutorservice; import java.util.concurrent.timeunit; public class Threadpooldemo {public static void main (string [] args) {ProgramedExecutorservice ses = ejecutors. // Si la tarea anterior no se ha completado, el despacho no comenzará. ses.schedulewithfixedDelay (new runnable () {@Override public void run () {try {Thread.sleep (1000); System.out.println (System.CurrentTimemillis ()/1000);} Catch (Exception e) {// TODO: Handle Exception}} Id, 0, 2, Timeunit.s. segundos, y luego ejecute una vez cada 2 segundos en un ciclo}}Producción:
1454832514
1454832517
1454832520
1454832523
1454832526
...
Dado que la ejecución de la tarea toma 1 segundo, la programación de tareas debe esperar a que la tarea anterior se complete. Es decir, cada 2 segundos aquí significa que se iniciará una nueva tarea 2 segundos después de que se complete la tarea anterior.
2. Extender y mejorar el grupo de hilos
2.1. Interfaz de devolución de llamada
Hay algunas API de devolución de llamada en el grupo de subprocesos para proporcionarnos operaciones extendidas.
EjecutorService ES = new ThreadPoolExecutor (5, 5, 0L, TimeUnit.Seconds, New LinkedBlockingqueue <Runnable> ()) {@Override Protected Void antes de EXECUTE (Thread t, Runnable R) {System.out.println ("Prepárese para ejecutar"); } @Override protegido vacío AfterExCute (runnable r, showable t) {System.out.println ("Ejecución completada"); } @Override Protected Void terminado () {System.out.println ("Salida del grupo de hilos"); }};Podemos implementar los métodos AutoryExCute, After Execute y finalizados de ThreadPoolExecutor para implementar la gestión de registros u otras operaciones antes y después de la ejecución de subprocesos, la salida del grupo de subprocesos.
2.2. Estrategia de rechazo
A veces, las tareas son muy pesadas, lo que resulta en demasiada carga en el sistema. Como se mencionó anteriormente, cuando aumenta el número de tareas, todas las tareas se colocarán en la cola de bloqueo de FixedThreadPool, lo que resulta en demasiado consumo de memoria y, finalmente, el desbordamiento de la memoria. Tales situaciones deben evitarse. Entonces, cuando encontramos que el número de hilos excede el número máximo de hilos, debemos renunciar a algunas tareas. Al descartar, debemos escribir la tarea en lugar de tirarla directamente.
Hay otro constructor en ThreadPoolExecutor.
Public ThreadPoolExeCutor (int corePoolSize, int MaximumummoolSize, Long KeepAliveTime, TimeUnit Unit, Bloquingqueue <Runnable> WorkQueue, ThreadFactory ThreadFactory, RecheedEdEdEdExeCutionHandler Handler) {if (corePoolSize <0 || maximumumumeolSize <= 0 || maximoMupoolSize <corePoolSize || keepAlIteTime <0 || maximumumumsize <= 0 || maximumUptOn IlegalargumentException (); if (workqueue == null || threadFactory == null || handler == null) tirar nueva nullpointerException (); this.corepoolSize = corePoolSize; this.maxiMumumpoolSize = maximumumeMoLSize; this.WorkQueue = WorkQueue; this.keepaliveTime = unit.tonanos (keepAlivEtime); this.threadFactory = ThreadFactory; this.handler = Handler; } Introduciremos ThreadFactory más tarde.
El controlador rechaza la implementación de la política, que nos dirá qué hacer si la tarea no se puede ejecutar.
Hay un total de las 4 estrategias anteriores.
Abortpolicy: si la tarea no se puede aceptar, se lanza una excepción.
Callerrunspolicy: si la tarea no se puede aceptar, deje que el hilo de llamada se complete.
DecardoldestPolicy: si la tarea no se puede aceptar, la tarea más antigua será descartada y mantenida por una cola.
Descardpolicy: si la tarea no se puede aceptar, la tarea se descartará.
EjecutorService ES = new ThreadPoolExeCutor (5, 5, 0L, TimeUnit.Seconds, New LinkedBlockingqueue <Runnable> (), new RecheedEdExecutionHandler () {@Override public void RecheedExeCution (runnable r, ThreadPoolExeCutor ejecutor) {System.out.println (r.Tostring () + ");"); ");"); "); Por supuesto, también podemos implementar la interfaz de DrecheDeCutionHandler nosotros mismos para definir la política de rechazo nosotros mismos.
2.3. Personalizar ThreadFactory
Acabo de ver que ThreadFactory se puede especificar en el constructor de ThreadPoolExecutor.
Los hilos en el grupo de subprocesos están creados por la fábrica de subprocesos, y podemos personalizar la fábrica de subprocesos.
Fábrica de subprocesos predeterminada:
Clase estática predeterminada implementos de factory threadFactory {private static final atomicInteger piscolnumber = new AtomItInseger (1); Grupo de Grupo de Thread Group final privado; Private final AtomicInteger ThreadNumber = new AtomicInteger (1); NAMEPREFIX de cadena final privada; DefaultThreadFactory () {SecurityManager S = System.getSecurityManager (); grupo = (s! = nulo)? s.getThreadGroup (): thread.currentThread (). getThreadGroup (); namePrefix = "Pool-" + PoolNumber.getAndIrrement () + "-Thread-"; } public thread newThread (runnable r) {Thread t = new Thread (grupo, R, NamePrefix + ThreadNumber.getAndIrrement (), 0); if (t.isdaemon ()) t.setdaemon (falso); if (t.getpriority ()! = thread.norm_priority) t.setpriority (hild.norm_priority); regresar t; }}3. Forkjoin
3.1. Pensamientos
Es la idea de dividir y conquistar.
La horquilla/unión es similar al algoritmo MapReduce. La diferencia entre los dos es: Fork/Join se divide en pequeñas tareas solo cuando es necesario, como si la tarea sea muy grande, mientras que MapReduce siempre comienza a realizar el primer paso para la segmentación. Parece que Fork/Join es más adecuado para un nivel de hilo dentro de un JVM, mientras que MapReduce es adecuado para sistemas distribuidos.
4.2. Uso de la interfaz
RecursIVEACTION: Sin valor de retorno
RecursiveTask: hay un valor de devolución
4.3. Ejemplo simple
import java.util.arrayList; import java.util.concurrent.forkjoinpool; import java.util.concurrent.forkJoNtask; import java.util.concurrent.recursiveTask; public class CountTask se extiende Recursivetask <Long> {INTOM ATRATO PRIVADO INTROMAL = 10000; comienzo privado largo; extremo largo privado; public CountTask (Long Start, Long End) {super (); this.start = inicio; this.end = end; } @Override protegido Long Compute () {Long Sum = 0; Boolean CANCOMPUTE = (end - inicio) <umbral; if (cancompute) {for (long i = start; i <= end; i ++) {sum = sum+i; }} else {// dividido en 100 tareas pequeñas paso largo = (inicio + end)/100; ArrayList <CountTask> SubTasks = new ArrayList <CountTask> (); larga pos = inicio; para (int i = 0; i <100; i ++) {Long dentone = pos+step; if (dastone> end) {dentone = end; } CountTask subtask = new CountTask (pos, dastone); pos + = paso + 1; subtarus.add (subtarea); subTask.fork (); // Push Subtasks a la piscina de subprocesos} para (CountTask t: SubTasks) {sum += t.Join (); // esperando que todas las subtareas finalicen}} suma de retorno; } public static void main (string [] args) {forkjoinpool forkjoinpool = new Forkjoinpool (); Tarea CountTask = nueva CountTask (0, 200000l); ForkJoNkask <Along> result = forkjoinpool.submit (tarea); intente {long res = result.get (); System.out.println ("sum =" + res); } catch (Exception e) {// tODO: manejar excepción E.PrintStackTrace (); }}} El ejemplo anterior describe una tarea de resumir. Divida las tareas acumuladas en 100 tareas, cada tarea solo realiza una suma de los números, y después de la unión final, se acumula la suma calculada por cada tarea.
4.4. Elementos de implementación
4.4.1.WorkQueue y CTL
Cada hilo tendrá una cola de trabajo
trabajo de clase final estática de trabajo
En la cola de trabajo, habrá una serie de campos que administran hilos.
Volátil int EventCount; // recuento de inactivación codificada; <0 si está inactivo
int nextwait; // Registro codificado del próximo camarero de eventos
int narrada; // Número de aceros
int // Sugerencia de índice de acero
PoolIndex corto; // índice de esta cola en el grupo
modo corto final; // 0: Lifo,> 0: Fifo, <0: Shared
Volátil int qlock; // 1: bloqueado, -1: terminar; de lo contrario 0
Base int volátil; // Índice de la próxima ranura para la encuesta
int top; // Índice de la siguiente ranura para empuje
Array de forkJoTask <?> []; // Los elementos (inicialmente no asignados)
Final Forkjoinpool Pool; // La piscina que contiene (puede ser nula)
Propietario final de ForkjoinworkerThread; // poseer hilo o nulo si se comparte
Hilo volátil Parker; // == Propietario durante la llamada al parque; más nulo
Volátil ForkJoTask <?> Currentjoin; // Tarea que se une en augejoin
ForcyJoinTask <?> CurrentSteal; // Tarea actual no local que se está ejecutando
Cabe señalar aquí que hay una gran diferencia entre JDK7 y JDK8 en la implementación de Forkjoin. Lo que presentamos aquí es de JDK8. En el grupo de hilos, a veces no todos los hilos se están ejecutando, se suspenderán algunos hilos y esos hilos suspendidos se almacenarán en una pila. Está representado internamente por una lista vinculada.
NextWait señalará el siguiente hilo de espera.
El índice del índice del subíndice en el grupo de subprocesos de PoolIndex.
EventCount Cuando se inicializa, EventCount está relacionado con PoolIndex. Un total de 32 bits, el primer bit indica si se activa y 15 bits indica el número de veces que ha sido suspendido
EventCount, el resto representa a PoolIndex. Use un campo para representar múltiples significados.
WorkQuqueue WorkQuqueue está representado por la matriz ForkJoTask <?> []. La parte superior y la base representan ambos extremos de la cola, y los datos están entre estos dos.
Mantener CTL (tipo de 64 bits de largo) en Forkjoinpool
Volátil CTL largo;
* Field CTL es un largo lleno de:
* AC: Número de trabajadores activos sin paralelismo objetivo (16 bits)
* TC: número de trabajadores totales menos paralelismo objetivo (16 bits)
* ST: Verdadero si se termina el grupo (1 bit)
* EC: El recuento de espera del hilo de espera superior (15 bits)
* ID: PoolIndex de Top of Treiber Stack of Waiters (16 bits)
AC representa el recuento de hilos activos menos el grado de paralelismo (probablemente el número de CPU)
TC significa el número total de hilos menos paralelismo
ST indica si el grupo de hilos en sí está activado
La CE representa el número de hilos suspendidos en el tiempo de espera superior
ID indica el piscolindex esperando el hilo en la parte superior
Es obvio que ST+EC+ID es lo que acabamos de llamar eventCount.
Entonces, ¿por qué tienes que sintetizar una variable con 5 variables? De hecho, la capacidad ocupa casi la misma con 5 variables.
La legibilidad de usar un código variable será mucho peor.
Entonces, ¿por qué usar una variable? De hecho, esto es lo más inteligente, porque estas 5 variables son todo. En el subproceso múltiple, si se usan 5 variables, al modificar una de las variables, cómo garantizar la integridad de las 5 variables. Luego, usar una variable resolverá este problema. Si se resuelve con cerraduras, el rendimiento se degradará.
El uso de una variable garantiza la consistencia y la atomicidad de los datos.
Los cambios en el escuadrón Forkjoin CTL se realizan utilizando operaciones CAS. Como se mencionó en la serie anterior de artículos, CAS es una operación sin bloqueo y tiene un buen rendimiento.
Dado que las operaciones CAS solo pueden dirigirse a una variable, este diseño es óptimo.
4.4.2. Robo de trabajo
A continuación, presentaremos el flujo de trabajo de todo el grupo de hilos.
Cada hilo llama a Runworker
Final Void Runworker (WorkQueue W) {W.GrowArray (); // Asignar cola para (int r = w.hint; scan (w, r) == 0;) {r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}} La función Scan () es para escanear las tareas que se realizarán.
R es un número relativamente aleatorio.
Private Final Int Scan (WorkQueue w, int r) {WorkQueue [] ws; int m; largo C = CTL; // para verificación de consistencia if ((ws = workqueues)! = null && (m = ws.length - 1)> = 0 && w! = null) {for (int j = m + m + 1, ec = w.eventcount ;;) {WorkQueue Q; int b, e; ForkJoNkask <?> [] A; FORKJOINTASK <?> T; if ((q = ws [(r - j) & m])! = null && (b = q.base) - q.top <0 && (a = q.array)! = null) {long i = (((longitud - 1) & b) << ashift) + abase; if ((t = ((((((ForkJoNTASK else if (q.base == b && u.compareandswapobject (a, i, t, null)) {U.putOrderedInt (Q, QBase, B + 1); if ((b + 1) - q.top <0) SignalWork (WS, Q); W.Runtask (t); } } romper; } else if (--j <0) {if ((ec | (e = (int) c)) <0) // retorno inactivo o terminado awaitwork (w, c, ec); else if (ctl == c) {// intente inactivar y enqueue long nc = (long) ec | ((c - ac_unit) & (ac_mask | tc_mask)); w.nextwait = e; W.EventCount = EC | Int_sign; if (! u.compareandswaplong (this, ctl, c, nc)) w.eventCount = ec; // retroceder} break; }}} return 0; } Echemos un vistazo al método de escaneo. Un parámetro de escaneo es el trabajo de trabajo. Como se mencionó anteriormente, cada hilo tendrá un trabajo de trabajo, y el trabajo de trabajo de múltiples hilos se guardará en WorkQueues. R es un número aleatorio. Use R para encontrar un trabajo de trabajo y tener tareas que hacer en WorkQueue.
Luego, a través de la base de trabajo, obtenga el desplazamiento de la base.
B = Q.Base
..
largo i = (((A. longitud - 1) y b) << ashift) + abase;
..
Luego obtenga la última tarea a través de la compensación y ejecute esta tarea.
T = ((ForkJoNkask <?>) U.GetObjectVolatile (a, i))
..
W.Runtask (t);
..
A través de este análisis aproximado, descubrimos que después de que el subproceso actual llama al método de escaneo, no ejecutará las tareas en el trabajo actual, pero obtendrá otras tareas de trabajo a través de un número aleatorio R. Este es uno de los principales mecanismos de Forkjoinpool.
El hilo actual no solo se centrará en sus propias tareas, sino que priorizará otras tareas. Esto evita que ocurra el hambre. Esto evita que algunos hilos no puedan completar las tareas en el tiempo debido a las razones atascadas u otras razones, o un hilo tiene una gran cantidad de tareas, pero otros hilos no tienen nada que hacer.
Luego echemos un vistazo al método RunTask
Final void runTask (ForkJoCkoNk <?> tarea) {if ((currentsteal = task)! = null) {forkjoinworkerThread hilo; task.doExec (); ForkJoNkask <?> [] A = array; int md = modo; ++ nsteals; CurrentSteal = null; if (md! = 0) pollandexecall (); else if (a! = null) {int s, m = a.length - 1; FORKJOINTASK <?> T; while ((s = top - 1) - base> = 0 && (t = (forkJoCkoNkask <?>) u.getandSetObject (a, ((M & s) << ashift) + abase, null))! = null) {top = s; t.doexec (); }} if ((thread = propietario)! = null) // no es necesario hacer en finalmente cláusula hilo.afterToPlevelExec (); }}Hay un nombre interesante: CurrentSteal, la tarea robada es de hecho lo que acabo de explicar.
task.doExec ();
Esta tarea se completará.
Después de completar las tareas de otras personas, completará sus propias tareas.
Obtenga la primera tarea obteniendo la parte superior
while ((s = top - 1) - base> = 0 && (t = (forkJoCkoNkask <?>) u.getandSetObject (a, ((M & s) << ashift) + abase, null))! = null) {top = s; t.doexec ();}A continuación, use un gráfico para resumir el proceso del grupo de subprocesos en este momento.
Por ejemplo, hay dos hilos T1 y T2. T1 obtendrá la última tarea de T2 a través de la base de T2 (por supuesto, en realidad es la última tarea de un hilo a través de un número aleatorio R), y T1 también realizará su primera tarea a través de su propia parte superior. Por el contrario, T2 hará lo mismo.
Las tareas que realiza para otros hilos comienzan desde la base, y las tareas que tomas comienzan desde la parte superior. Esto reduce el conflicto
Si no se encuentran otras tareas
else if (--j <0) {if ((ec | (e = (int) c)) <0) // retorno inactivo o terminado en espera (w, c, ec); else if (ctl == c) {// intente inactivar y enqueue long nc = (long) ec | ((c - ac_unit) & (ac_mask | tc_mask)); w.nextwait = e; W.EventCount = EC | Int_sign; if (! u.compareandswaplong (this, ctl, c, nc)) w.eventCount = ec; // retroceder} break; } Luego, primero, el valor de CTL se cambiará a través de una serie de ejecuciones, se obtendrá NC y luego se asignará el nuevo valor con CAS. Luego llame a AwaitWork () para ingresar al estado de espera (llamado Método del Parque Unsafe mencionado en la serie de artículos anteriores).
Lo que debemos explicar aquí es cambiar el valor CTL. Aquí, primero, AC -1 en CTL, y AC ocupa los 16 bits principales de CTL, por lo que no puede ser directamente -1, sino que logra el efecto de hacer los 16 bits superiores de CTL -1 a través de AC_UNIT (0x10000000000000000) de los primeros 16 bits de CTL.
Como se mencionó anteriormente, la cuenta de eventos salva el PoolIndex, y a través del PoolIndex y NextWait en WorkQueue, puede atravesar todos los hilos que esperan.