Escrevemos sobre como enviar e receber mensagens através de uma fila nomeada. Se você ainda não sabe, clique em: Introdução ao RabbitMQ Java. Neste artigo, criaremos uma fila de trabalho para distribuir tarefas demoradas entre os consumidores.
A principal tarefa de uma fila de trabalho é evitar a execução imediatamente de tarefas com uso intensivo de recursos e, em seguida, ter que esperar que eles sejam concluídos. Em vez disso, realizamos o agendamento de tarefas: encapsulamos a tarefa como uma mensagem e a enviamos para a fila. O trabalho é executado em segundo plano e remove constantemente as tarefas da fila e as executa. Quando você executa vários processos de trabalhadores, as tarefas na fila de tarefas serão compartilhadas pelo processo do trabalhador.
Esse conceito é extremamente útil em aplicativos da Web quando tarefas complexas precisam ser executadas entre solicitações HTTP muito curtas.
1. Prepare
Usamos o Thread.Sleep para simular tarefas demoradas. Adicionamos um certo número de pontos no final da mensagem enviada para a fila, cada ponto significa que leva 1 segundo no thread do trabalhador, como Hello ... precisará esperar 3 segundos.
Remetente:
Newtask.java
importar java.io.ioException; importar com.rabbitmq.client.channel; importar com.rabbitmq.client.connection; importar com.rabbitmq.client.connectionFactory; public Class newTask {// filee name privado final string queueeeee_name = "trabalho de trabalho"; canal conexão de fábrica = new ConnectionFactory (); Factory.Sethost ("localhost"); conexão de conexão = factory.newConnection (); canal canal = Connection.createChannel (); // declara o canal da fila; i ++) {string Dots = ""; para (int j = 0; j <= i; j ++) {Dots+= ".";} string message = "helloworld"+pontos+pontos.Length (); Channel.basicpublish ("", queue_name, nulo, mensagens.Getbytes (); system. system. system. system.sic ("", Queue_name, null, fetbybytes (). "'");} // fechar o canal e o recurso canal.close (); conexão.close ();}}Receptor:
Work.java
importar com.rabbitmq.client.channel; importar com.rabbitmq.client.connection; importar com.rabbitmq.client.ConnectionFactory; importar com.rabbitmq.client.queueingConsumer; public class Work {// citação privada Final Static String; java.io.ioException, java.lang.interruptedException {// distingue entre diferentes processos de trabalhadores int hashcode = work.class.hashcode (); // criação de conexão e canal de conexão de conexão = fábrica.NewareNeNENCTY (); fábrica. fila canal.queedeclare (fileue_name, false, false, false, null); system.out.println (hashcode + "[*] aguardando mensagens. Para sair da pressão ctrl + c"); fileEingConsumer Consumer = while (true) {fileueingConsumer.Delivery entrega = consumer.NextDelivery (); string message = new String (Delivery.getBody ()); System.out.println (hashcode + "[x] recebido '" + message + ""); DOWORK (message); System.out.Println (Hashcode + ""); @Param Task * @Throws InterruptEdException */private estático void Dowork (tarefa de string) lança interruptedException {for (char ch: task.toCharArray ()) {if (ch == '.') Thread.sleep (1000);}}} Encaminhamento redondo-robin
A vantagem de usar filas de tarefas é que elas podem trabalhar em paralelo facilmente. Se tivermos muito acionamento de trabalho, podemos resolver o problema adicionando mais trabalhadores, tornando o sistema mais escalável.
Em seguida, primeiro administraremos 3 instâncias de trabalhadores (Work.java) e depois executaremos o newtask.java. As três instâncias dos trabalhadores receberão informações. Mas como alocar? Vejamos o resultado da saída:
[x] enviado 'helloworld.1' [x] enviado 'helloworld..2' [x] enviado 'helloworld ... 3' [x] enviado 'helloworld ...... 4' [x] enviado 'helloworld ...... 5' [x] enviado 'Helloworld ...... 'Helloworld .......... 10' trabalhador 1: 605645 [*] esperando por mensagens. Para sair, pressione Ctrl+C605645 [x] recebeu 'Helloworld.1'605645 [x] Done605645 [x] Recebeu' Helloworld .... 4'605645 [x] done605645 [x] recebeu 'Helloworld ......... 7'605645 [x] 'Helloworld .......... 10'605645 [x] TRABALHADOR 2: 18019860 [*] esperando por mensagens. Para sair, pressione Ctrl+C18019860 [x] recebeu 'Helloworld..2'18019860 [x] Done18019860 [x] Recebido' Helloworld ..... 5'18019860 [x] done18019860 [x] Recebido 'Helloworld ....... [*] Esperando por mensagens. Para sair, pressione Ctrl+C18019860 [x] recebeu 'Helloworld ... 3'18019860 [x] Done18019860 [x] Recebeu' Helloworld ...... 6'18019860 [x] Done18019860 [x] Recebeu '18019860 [x] don18019860 [x]
Como você pode ver, por padrão, o RabbitMQ enviará informações para o próximo consumidor um por um, independentemente da duração de cada tarefa etc., e é uma alocação única, não uma alocação. Em média, cada consumidor receberá uma quantidade igual de informações. Essa maneira de distribuir mensagens é chamada Round-Robin.
2. MessageAcknowledgments
Leva vários segundos para executar uma tarefa. Você pode estar preocupado com interrupções quando um trabalhador executa uma tarefa. Em nosso código acima, uma vez que o RabbitMQ entregue uma mensagem ao consumidor, ele removerá imediatamente essas informações da memória. Nesse caso, se um dos trabalhadores que estiver executando a tarefa for morto, perderemos as informações que está processando. Também perderemos mensagens que foram encaminhadas para esse trabalhador e ainda não foram executadas.
No exemplo acima, iniciamos duas tarefas e executamos o código que envia a tarefa (newtask.java) e depois fechamos imediatamente a segunda tarefa, e o resultado é:
Trabalhador 2: 31054905 [*] waitformessages.toexitPressctrl+C 31054905 [x] Recebido'helloworld..2 '31054905 [x] feito 31054905 [x] recebido'helloworld .... 4' trabalhador 1: 18019860 [*] 18019860 [x] Recebido'helloworld.1 '18019860 [x] feito 18019860 [x] Recebido'helloworld ... 3' 18019860 [x] feito 18019860 [x] recebido'helloworld ......... 5 '18019860 [x] done 180190 [x] 18019860 [x] feito 18019860 [x] recebido'helloworld ......... 9 '18019860 [x] feito
Pode -se observar que o segundo trabalhador perdeu pelo menos as tarefas 6, 8 e 10, e a Tarefa 4 não foi concluída.
No entanto, não queremos perder tarefas (informações). Quando um trabalhador (destinatário) é morto, queremos passar a tarefa para outro trabalhador.
Para garantir que as mensagens nunca sejam perdidas, o RabbitMQ suporta reconhecimento de mensagens. O consumidor envia uma resposta ao RabbitMQ, dizendo que as informações foram recebidas e processadas e, em seguida, o RabbitMQ pode excluir livremente as informações.
Se o consumidor for morto sem enviar uma resposta, o RabbitMQ assumirá que as informações não foram completamente processadas e serão redirecionadas para outro consumidor. Dessa forma, você pode confirmar que as informações não estão perdidas, mesmo que o consumidor seja ocasionalmente morto.
Esse mecanismo não significa que o tempo limite não seja o caso. O RabbitMQ apenas re-substitui essas informações quando a conexão do consumidor está desconectada. É permitido se o consumidor levar muito tempo para processar uma mensagem.
A resposta da mensagem está ligada por padrão. No código acima, desativamos esse mecanismo definindo autoask = true, como mostrado. Vamos modificar o código (work.java):
boolean ack = false; // Abra o canal do mecanismo de resposta.basicConsume (Queue_Name, ACK, consumidor); // Além disso, você precisa enviar uma resposta manualmente após cada processamento concluir uma mensagem. Channel.Basicack (Delivery.getEnvelope (). getDeliveryTag (), false);
Trabalho completamente modificado.java
importar com.rabbitmq.client.channel; importar com.rabbitmq.client.connection; importar com.rabbitmq.client.connectionFactory; importar com.rabbitmq.client.queueingConsumer; public class Work {// citação privada Final Static String; java.io.ioException, java.lang.interruptedException {// distingue entre diferentes processos de trabalhadores int hashcode = work.class.hashcode (); // criação de conexão e canal de conexão de conexão = fábrica.NewareNeNENCTY (); fábrica. fila canal. Channel.BasicConsume (Queue_Name, ACK, Consumidor); while (true) {fileueingConsumer.delivery entrega = consumer.nextDelivery (); string message = new String (entregen.getbody ()); system.out.println (hashcode + "[x] recebeu '" + message + ""); dowork (mensagem); system.out.println (hashcode + ""); Channel.Basicack (Delivery.getEnvelope (). GetDeliveryTag (), false);}}}teste:
Mudamos o número de mensagens para 5, depois abrimos dois consumidores (work.java), depois enviamos uma tarefa (newtask.java), fechamos imediatamente um consumidor e observamos a saída:
[x] Sent'helloworld..2 '[x] Sent'helloworld ... 3' [x] Sent'helloworld ... 4 '[x] Sent'helloworld ...... 5' Worker2 18019860 [*] Waitformessages.ToExitPressctrl+C 18019860 [x] Recebido'helloworld .. 18019860 [x] Recebido'helloworld .... 4 'trabalhador1 31054905 [*] waitherformessages.toexitPressctrl+C 31054905 [x] Recebido'helloworld.1' 31054905 [x] doado 31054905 [x] Recebido'helloworld ... 3 '31054905 [x] 31054905 [x] Recebido'helloworld .... 5 '31054905 [x] feito 31054905 [x] recebido'helloworld .... 4' 31054905 [x] feito
Você pode ver que a tarefa 4 que não foi concluída pelo trabalhador 2 é reformulada para o trabalhador 1 para conclusão.
3. Durabilidade da mensagem
Aprendemos que, mesmo que os consumidores sejam mortos, a mensagem não será perdida. Mas se o serviço RabbitMQ for interrompido neste momento, nossas mensagens ainda serão perdidas.
Quando o RabbitMQ sai ou sai anormalmente, todas as filas e informações serão perdidas, a menos que você diga para não perdê -las. Precisamos fazer duas coisas para garantir que as informações não sejam perdidas: precisamos definir sinalizadores persistentes para todas as filas e mensagens.
Primeiro, precisamos confirmar que o RabbitMQ nunca perderá nossa fila. Para fazer isso, precisamos declará -lo como persistente.
booleandurável = true;
canal.queuedeclare ("task_queue", durável, falso, falso, nulo);
NOTA: O RabbitMQ não permite redefinir uma fila com parâmetros diferentes; portanto, não podemos modificar os atributos da fila que já existe.
Segundo, precisamos identificar nossas informações como persistentes. Defina o valor MessageProperties (implementosBasicProperties) como persistent_text_plain.
canal.basicpublish ("", "task_queue", messageProperties.persistent_text_plain, message.getBytes ());
Agora você pode executar um programa que envia mensagens, feche o serviço, reinicie o serviço e execute o programa de consumidores para fazer o experimento.
4. Fairdispatch
Talvez descobrimos que o mecanismo atual de encaminhamento de mensagens (Round-Robin) não é o que queremos. Por exemplo, nesse caso, para dois consumidores, há uma série de tarefas, tarefas estranhas são particularmente demoradas, enquanto até as tarefas são fáceis, o que faz com que um consumidor fique ocupado enquanto o outro consumidor concluirá rapidamente a tarefa e aguarda após o término.
A razão para isso é que o RabbitMQ apenas encaminha as mensagens quando a mensagem chega na fila. Não se importe com quantas tarefas o consumidor não entregou uma resposta ao RabbitMQ. Apenas avance cegamente todos os números ímpares para um consumidor e até números para outro consumidor.
Para resolver esse problema, podemos usar o método BASICQOS, passando o parâmetro como prefetchCount = 1. Isso diz ao RabbitMQ para não dar mais de uma mensagem a um consumidor ao mesmo tempo. Em outras palavras, a próxima mensagem será enviada apenas quando o consumidor estiver ocioso.
int prefetchCount = 1; canal.basicqos (prefetchCount);
Nota: Se todos os trabalhadores estiverem ocupados, sua fila poderá ser preenchida. Você pode observar o uso da fila e adicionar trabalhadores ou usar alguma outra estratégia.
Teste: Altere o código para enviar uma mensagem, altere o número final de pontos para 6-2 e inicie os dois trabalhadores primeiro e envie a mensagem:
[x] Enviado 'Helloworld ...... 6' [x] enviado 'Helloworld ..... 5' [x] enviado 'Helloworld .... 4' [x] enviado 'Helloworld ... 3' [x] enviado 'Helloworld..2' Trabalhador 1: 18019860 [*] por mensagens. Para sair, pressione Ctrl+C18019860 [x] recebeu 'Helloworld ...... 6'18019860 [x] Done18019860 [x] Recebeu' Helloworld ... 3'18019860 [x] trabalhador 2: 31054905 [*] por mensagens. Para sair, pressione Ctrl+C31054905 [x] recebeu 'Helloworld ...... 5'31054905 [x] Done31054905 [x] Recebido' Helloworld ...... 4'31054905 [x] feito
Pode-se observar que a mensagem não foi encaminhada de acordo com o mecanismo de Round-Robin anterior no momento, mas foi encaminhado quando o consumidor não estava ocupado. Além disso, nesse modelo, os consumidores são suportados para aumentar dinamicamente porque a mensagem não é enviada e o aumento dinâmico é aumentado imediatamente. O mecanismo de encaminhamento padrão causará, mesmo que o consumidor seja adicionado dinamicamente, a mensagem foi alocada e não poderá ser adicionada imediatamente, mesmo que haja muitas tarefas inacabadas.
5. Código completo
Newtask.java
importar java.io.ioException; importar com.rabbitmq.client.channel; importação com.rabbitmq.client.connection; importar com.rabbitmq.client.connectionFactory; importar; estático void main (string [] args) lança a ioException {// Crie conexão e conexão de canal Factory = new ConnectionFactory (); Factory.Sethost ("localhost"); conexão de conexão = factory.newConnection (); canal de canal = conexão.Createchannel (); // Reclua o queue se definido; canal.queuedeclare (fileue_name, durável, falso, falso, nulo); // Envie 10 mensagens, anexando 1-10 pontos após a mensagem por sua vez para (int i = 5; i> 0; i-) {string tots = ""; para (int j = 0; j <= i; j ++) {Dots += ""; MessageProperties 2. Defina o canal de persistência da mensagem.Work.java
importar com.rabbitmq.client.channel; importar com.rabbitmq.client.connection; importar com.rabbitmq.client.connectionFactory; importar com.rabbitmq.client.queueeingConsumer; public class Work {// Quieue Nome Private Static String queee_Name = "WorkQueeDee; lança java.io.ioException, java.lang.interruptedException {// diferencia a saída de diferentes processos de trabalhadores int hashcode = work.class.hashcode (); // criação de conexão e canal de canal; Declare a fila booleana durável = true; canal. 1; canal.basicqos (prefetchCount); fileingConsumer Consumer = new QueueingConsumer (canal); // Especifique a fila de consumo boolean ack = false; // ativar o canal de mecanismo de resposta.basicConsume (Queue_name_name, ACK, consumidor); while (true) {fileueingConsumer.Delivery Delivery = Consumer.NextDelivery (); String message = new String (Delivery.getBody ()); System.out.println (hashcode + "[x] recebido '" + mensagem + ""); Feito "); // canal.basicack (entregen.getEnvelope (). GetDeliveryTag (), false); canal.basicack (entrega.getenvelope (). GetDeliveryTag (), false); theatcEst -chart}/** ** * tarefa.ToCharArray ()) {if (ch == '.') Thread.sleep (1000);}}}Resumir
O exposto acima é toda a explicação detalhada do código da fila de trabalho Java neste artigo, espero que seja útil para todos. Se houver alguma falha, deixe uma mensagem para apontá -la. Obrigado amigos pelo seu apoio para este site!