Escribimos sobre enviar y recibir mensajes a través de una cola con nombre. Si aún no lo sabe, haga clic: Comience con Rabbitmq Java. En este artículo, crearemos una cola de trabajo para distribuir tareas que consuman mucho tiempo entre los consumidores.
La tarea principal de una cola de trabajo es evitar ejecutar inmediatamente tareas intensivas en recursos y luego tener que esperar a que se completen. En cambio, realizamos la programación de tareas: encapsulamos la tarea como un mensaje y la enviamos a la cola. El trabajo se ejecuta en segundo plano y elimina constantemente las tareas de la cola y luego las ejecuta. Cuando ejecuta múltiples procesos de trabajadores, las tareas en la cola de tareas serán compartidas por el proceso del trabajador.
Tal concepto es extremadamente útil en las aplicaciones web cuando se requiere que se realicen tareas complejas entre solicitudes HTTP muy cortas.
1. Preparar
Usamos hilo. Duerme para simular tareas que consumen mucho tiempo. Agregamos un cierto número de puntos al final del mensaje enviado a la cola, cada punto significa que toma 1 segundo en el hilo del trabajador, como hola ... deberá esperar 3 segundos.
Remitente:
Newtask.java
import java.io.ioException; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import canal ConnectionFactory factory = new ConnectionFactory (); factory.sethost ("localhost"); conexión conexión = factory.newConnection (); canal channel = conexión.createChannel (); // Declarar el canal de cola i ++) {string dots = ""; for (int j = 0; j <= i; j ++) {dots+= ".";} string Message = "HelloWorld"+dots+dots.length (); channel.basicpublish ("", queue_niM "'");} // Cerrar canal y recursos channel.close (); conecte.close ();}}Receptor:
Trabajo.java
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; import java.io.ioException, java.lang.interruptedException {// Distinga entre los diferentes procesos de trabajadores int hashcode = work.class.hashcode (); // crea conexión y canal Connectación factory = new ConnectionFactory (); factory.sethost ("localhost"); conexión de conexión = factory.newconnection (); channel = connection.creat.creatEns (); Queue Channel.queueDEClare (queue_name, false, false, false, null); system.out.println (hashcode + "[*] esperando mensajes. Para salir de presionar ctrl + c"); queueingconsumer consumo = new QueueingConsumer (canal); // especifica el consumo de cola. while (true) {queueingconsumer.delivery entrega = consumidor.nextDelivery (); string Message = new String (entregawning.getBody ()); system.out.println (hashcode + "[x] recibido '" + mensaje + "'"); dowork (mensaje); system.println (hashcode + "[x] do";}/}/** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** Toats Toats Toats tougs @param tarea * @throws interrumpedException */private static void dowork (tarea de cadena) arroja interruptedException {for (char ch: tarea.toCarArray ()) {if (ch == '.') Thread.sleep (1000);}}} Reenvío
La ventaja de usar colas de tareas es que pueden funcionar en paralelo fácilmente. Si tenemos mucha cartera de trabajo, podemos resolver el problema agregando más trabajadores, haciendo que el sistema sea más escalable.
A continuación, primero ejecutaremos las instancias de 3 trabajadores (trabajo.java) y luego ejecutaremos newtask.java. Las instancias de tres trabajadores obtendrán información. ¿Pero cómo asignar? Veamos el resultado de la salida:
[x] enviado 'helloworld.1' [x] enviado 'helloworld..2' [x] enviado 'helloworld ... 3' [x] enviado 'helloworld ...... 4' [x] enviado 'helloworld ...... 5' [x] enviado 'helloworld ...... 6' [x] enviado 'helloworld ....... 7' [x] envió 'helloworld ....... 8' [x] helloworld ... 'Helloworld .......... 10' Worker 1: 605645 [*] esperando mensajes. Para salir de presionar Ctrl+C605645 [x] Recibido 'Helloworld.1'605645 [x] done605645 [x] Recibido' Helloworld .... 4'605645 [x] do605645 [x] recibió 'helloworld ......... 7'605645 [x] do605645 [x] recibido 'Helloworld .......... 10'605645 [x] Done Worker 2: 18019860 [*] esperando mensajes. Para salir de presionar Ctrl+C18019860 [x] Recibido 'Helloworld..2'18019860 [x] dede18019860 [x] recibió' Helloworld ..... 5'18019860 [x] dede18019860 [x] recibió 'Helloworld ......... 8'18019860 [x] Worker 3: Worker 3: 18019860 [*] Esperando mensajes. Para salir de presionar Ctrl+C18019860 [x] Recibido 'Helloworld ... 3'18019860 [x] do8019860 [x] Recibido' helloworld ...... 6'18019860 [x] dede18019860 [x] recibido 'helloworld ...... 9'18019860 [x]
Como puede ver, por defecto, RabbitMQ enviará información al próximo consumidor uno por uno, independientemente de la duración de cada tarea, etc., y es una asignación única, no una por una asignación una por una. En promedio, cada consumidor recibirá una cantidad igual de información. Esta forma de distribuir mensajes se llama round-robin.
2. Messageacknowledgments
Se tarda varios segundos en realizar una tarea. Puede estar preocupado por las interrupciones cuando un trabajador realiza una tarea. En nuestro código anterior, una vez que RabbitMQ entrega un mensaje al consumidor, eliminará inmediatamente esta información de la memoria. En este caso, si uno de los trabajadores que realizan la tarea es asesinado, perderemos la información que está procesando. También perderemos mensajes que se han enviado a este trabajador y aún no se ha ejecutado.
En el ejemplo anterior, primero iniciamos dos tareas, luego ejecutamos el código que envía la tarea (newtask.java), y luego cerramos inmediatamente la segunda tarea, y el resultado es:
Trabajador 2: 31054905 [*] WaitingFormessages.ToExitPressCtrl+C 31054905 [x] Recibido'Helloworld..2 '31054905 [x] hecho 31054905 [x] Recibido'Helloworld .... 4' Trabajador 1: 18019860 [*] WaitingFormesage.toexitpressctrl+ 18019860 [x] recibido'Helloworld.1 '18019860 [x] Dado 18019860 [x] recibido'Helloworld ... 3' 18019860 [x] Dado 18019860 [x] recibió'Helloworld ......... 5 '18019860 [x] Dado 18019860 [x] Recibido'helloworld ..... 18019860 [x] hecho 18019860 [x] recibió'Helloworld ......... 9 '18019860 [x] Hecho
Se puede ver que el segundo trabajador perdió al menos las tareas 6, 8 y 10, y la Tarea 4 no se completó.
Sin embargo, no queremos perder ninguna tarea (información). Cuando se mata a un trabajador (destinatario), queremos pasar la tarea a otro trabajador.
Para garantizar que los mensajes nunca se pierdan, RabbitMQ admite los reconocimientos de mensajes. El consumidor envía una respuesta a RabbitMQ, diciéndole que la información ha sido recibida y procesada, y luego RabbitMQ puede eliminar libremente la información.
Si el consumidor es asesinado sin enviar una respuesta, RabbitMQ asumirá que la información no se ha procesado por completo y será redirigida a otro consumidor. De esta manera, puede confirmar que la información no se pierde, incluso si el consumidor se mata ocasionalmente.
Este mecanismo no significa que el tiempo de espera no sea el caso. RabbitMQ solo vuelve a repetir esta información cuando la conexión del consumidor se desconecta. Está permitido si el consumidor tarda un tiempo particularmente en procesar un mensaje.
La respuesta del mensaje está activada de forma predeterminada. En el código anterior, apagamos este mecanismo configurando Autoask = True como se muestra. Vamos a modificar el código (Work.java):
ack booleano = falso; // Abrir el mecanismo de respuesta Channel.BasicConsume (queue_name, ack, consumidor); // Además, debe enviar manualmente una respuesta después de que cada procesamiento complete un mensaje. Channel.Basicack (entrega.getenvelope (). getDeliveryTag (), falso);
Trabajo completamente modificado.java
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; import java.io.ioException, java.lang.interruptedException {// Distinga entre los diferentes procesos de trabajadores int hashcode = work.class.hashcode (); // crea conexión y canal Connectación factory = new ConnectionFactory (); factory.sethost ("localhost"); conexión de conexión = factory.newconnection (); channel = connection.creat.creatEns (); Queue Channel.queueDEClare (queue_name, false, falso, false, null); system.out.println (hashcode + "[*] esperando mensajes. Channel.BasicConsume (queue_name, ack, consumidor); while (true) {queueingconsumer.delivery entrega = consumidor.nextDelivery (); string Message = new String (administración.getBody ()); system.out.println (hashcode + "[x] recibido '" + mensaje + "'"); dowork (mensaje); system.println (hashcode + "[x] hecho"); // send Channel.Basicack (entrega.getenvelope (). getDeliverytag (), falso);}}}prueba:
Cambiamos el número de mensajes a 5, luego abrimos dos consumidores (Work.java), luego enviamos una tarea (newtask.java), cerramos inmediatamente a un consumidor y observa la salida:
[x] envió'helloworld..2 '[x] enviado'HellowOrld ... 3' [x] enviado'HellowOrld ... 4 '[x] enviado'Helloworld ...... 5' Worker2 18019860 [*] WaitingFormessages.ToExitPressCtrl+C 18019860 [x] Recibido'helloworld..2 '18019860 [x] realizado 18019860 [x] recibido'Helloworld .... 4 'Worker1 31054905 [*] WaitingFormessages.ToExitPressCtrl+C 31054905 [x] recibido'Helloworld.1' 31054905 [x] hecho 31054905 [x] recibido '31054905 [x] realizado 31054905 [x] Recibido'Helloworld .... 5 '31054905 [x] Dado 31054905 [x] recibido'HelloworLld .... 4' 31054905 [x] Hecho
Puede ver que la Tarea 4 que no completó por Worker 2 se vuelve a forar al Worker 1 para su finalización.
3. Durabilidad mensajes
Hemos aprendido que incluso si los consumidores son asesinados, el mensaje no se perderá. Pero si el servicio RabbitMQ se detiene en este momento, nuestros mensajes aún se perderán.
Cuando RabbitMQ salga o sale anormalmente, todas las colas e información se perderán a menos que le diga que no lo pierda. Necesitamos hacer dos cosas para asegurarnos de que la información no se pierda: necesitamos establecer banderas persistentes para todas las colas y mensajes.
Primero, debemos confirmar que RabbitMQ nunca perderá nuestra cola. Para hacer esto, necesitamos declararlo como persistente.
booleandurable = verdadero;
Channel.QueueDeclare ("task_queue", duradero, falso, falso, nulo);
Nota: RabbitMQ no permite redefinir una cola con diferentes parámetros, por lo que no podemos modificar los atributos de la cola que ya existe.
En segundo lugar, necesitamos identificar nuestra información como persistente. Establezca el valor de MessageProperties (ImplementsBasicProperties) en persistente_text_lain.
Channel.BasicPublish ("", "Task_queue", MessageProperties.Persistent_Text_Plain, Message.getBytes ());
Ahora puede ejecutar un programa que envía mensajes, luego cerrar el servicio, reiniciar el servicio y ejecutar el programa de consumo para hacer el experimento.
4. Fairdispatch
Quizás descubramos que el mecanismo actual de reenvío de mensajes (round-robin) no es lo que queremos. Por ejemplo, en este caso, para dos consumidores, hay una serie de tareas, las tareas impares requieren mucho tiempo, mientras que incluso las tareas son fáciles, lo que hace que un consumidor esté ocupado, mientras que el otro consumidor complete rápidamente la tarea y espere después de que termine.
La razón de esto es que RabbitMQ solo reenvía mensajes cuando el mensaje llega a la cola. No me importa cuántas tareas no haya entregado el consumidor a RabbitMQ. Simplemente reenvíe ciegamente todos los números impares a un consumidor e incluso números a otro consumidor.
Para resolver este problema, podemos usar el método BASICQOS, pasando el parámetro como prefetchcount = 1. Esto le dice a RabbitMQ que no dé más de un mensaje a un consumidor al mismo tiempo. En otras palabras, el siguiente mensaje se enviará solo cuando el consumidor esté inactivo.
int prefetchcount = 1; Channel.Basicqos (prefetchcount);
Nota: Si todos los trabajadores están ocupados, su cola puede ser llena. Puede observar el uso de la cola y luego agregar trabajadores, o usar alguna otra estrategia.
Prueba: cambie el código para enviar un mensaje, cambie el número final de puntos a 6-2, luego inicie los dos trabajadores primero y luego envíe el mensaje:
[x] Enviado 'Helloworld ...... 6' [x] enviado 'Helloworld ..... 5' [x] enviado 'Helloworld .... 4' [x] enviado 'Helloworld ... 3' [x] enviado 'Helloworld..2' Worker 1: 18019860 [*] esperando mensajes. Para salir de presione Ctrl+C18019860 [x] Recibido 'Helloworld ...... 6'18019860 [x] done18019860 [x] recibió' helloworld ... 3'18019860 [x] trabajador dado 2: 31054905 [*] esperando mensajes. Para salir de presione Ctrl+C31054905 [x] Recibido 'Helloworld ...... 5'31054905 [x] done31054905 [x] recibió' helloworld ...... 4'31054905 [x] listo
Se puede ver que el mensaje no se reenvió de acuerdo con el mecanismo anterior de Round-Robin en este momento, pero se envió cuando el consumidor no estaba ocupado. Además, bajo este modelo, los consumidores reciben apoyo para aumentar dinámicamente porque el mensaje no se envía, y el aumento dinámico aumenta de inmediato. El mecanismo de reenvío predeterminado causará, incluso si el consumidor se agrega dinámicamente, el mensaje se ha asignado y no se puede agregar de inmediato, incluso si hay muchas tareas inacabadas.
5. Código completo
Newtask.java
import java.io.ioException; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; import com.rabbitmq.client.messageProperties; public class NewTask {// Nombre de la cola String Final static stateeeeeeeeeeueeeeeeee_name = "" static void main (string [] args) lanza IOException {// Crear conexión y canal ConnectionFactory factory = new ConnectionFactory (); factory.sethost ("localhost"); conexión conexión = factory.newconnection (); canal = conexión.createChannel (); // declarar el boole de la cola didurbible = true; // 1. set queue queue queue queue queue queue queue Persistence Channel.queueDEClare (queue_name, duradera, falso, falso, null); // Envía 10 mensajes, agregando 1-10 puntos después del mensaje a su vez para (int i = 5; i> 0; i--) {string dots = ""; for (int j = 0; j <= i; j ++) {dots += ";";} string Message = "HELLOWORLD" +DOTS "(DOTS (); MessageProperties 2. Establezca Message Persistence Channel.BasicPublish ("", queue_name, MessageProperties.Persistent_Text_Plain, Message.getBytes ()); System.out.Println ("[x] enviado" + Message + "" ");} // Cerrar canal y recurso Channel.close (); Connection.close ();}}}}Trabajo.java
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabitmq.client.connectionFactory; import lanza java.io.ioException, java.lang.interruptedException {// diferencian la salida de diferentes procesos de trabajadores int hashcode = work.class.hashcode (); // crea conexión y canal Connectiony factory = new ConnectionFactory (); factory.sethost ("localhost"); conexión de conexión = factory.newonnection (); new Connection (); factory.sethost ("localhost"); conexión de conexión = factory.newonnection ();; Declare la cola boolean duradera = true; channel.queueDeclare (queue_name, duradera, falso, falso, null); system.out.println (hashcode + "[*] esperando mensajes. Para salir presione ctrl + c"); // Establezca el número máximo de mensajes reenviados para el servicio int prefetchcunt = 1; Channel.Basicqos (prefetchcount); QueueingConsumer Consumer = new QueueingConsumer (canal); // Especifique el consumo quee boolean ack = false; // active el mecanismo de respuesta canal.basicconsume (queue_name, ack, consumidor); while (true) {queueingconsumer.delivery entrega = consumidor.nextDelivery (); string Message = new String (administración.getBody ()); system.out.println (hashcode + "[x] recibió '" + mensaje + "'"); dowork (mensaje); system.println (hashcode + "[x] Hecho "); // canal.basicack (entrega.getenvelope (). GetDeliveryTag (), false); canal.basicack (entrega.getEnvelope (). GetDeliveryTag (), falso);}}/** * Cada punto toma 1S * * @Param tarea * @throws InterruptedException */privado void void dowork (string Task) Task) Tasks SilterEd (Task). task.toCarArray ()) {if (ch == '.') Thread.sleep (1000);}}}Resumir
Lo anterior es toda la explicación detallada del código de cola de trabajo Java en este artículo, espero que sea útil para todos. Si hay alguna deficiencia, deje un mensaje para señalarlo. ¡Gracias amigos por su apoyo para este sitio!