Nous avons écrit sur l'envoi et la réception de messages via une file d'attente nommée. Si vous ne savez pas encore, veuillez cliquer: commencer avec Rabbitmq Java. Dans cet article, nous créerons une file d'attente de travail pour distribuer des tâches longues parmi les consommateurs.
La tâche principale d'une file d'attente de travail est d'éviter d'exécuter immédiatement des tâches à forte intensité de ressources et de devoir attendre qu'elles se terminent. Au lieu de cela, nous effectuons la planification des tâches: nous encapsulons la tâche en tant que message et l'envoyons dans la file d'attente. Le travail est exécuté en arrière-plan et supprime constamment les tâches de la file d'attente, puis les exécute. Lorsque vous exécutez plusieurs processus de travail, les tâches dans la file d'attente de tâches seront partagées par le processus de travail.
Un tel concept est extrêmement utile dans les applications Web lorsque des tâches complexes doivent être effectuées entre les demandes HTTP très courtes.
1. Préparer
Nous utilisons du fil.Sleep pour simuler des tâches longues. Nous ajoutons un certain nombre de points à la fin du message envoyé à la file d'attente, chaque point signifie qu'il faut 1 seconde dans le fil du travailleur, comme Hello ... devra attendre 3 secondes.
Expéditeur:
Newtask.java
Importer java.io.ioException; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; classe publique newtask {// nom de file d'attente privée static static queue_name = "workqueue"; public static Void Main (String [] args) throws ioexe ConnectionFactory Factory = new ConnectionFactory (); factory.sethost ("localhost"); connexion connection = factory.newconnection (); canal channel = connection.createChannel (); // Declare the Queue Channel.QueeDedeclare (queue_name, false, false, false, null); // envoyait 10 messages, enregistrez 1-1-0 points i ++) {String DOTS = ""; for (int j = 0; j <= i; j ++) {dots + = ".";} string message = "helloworld" + dotes + dots.length (); channel.basicPublish ("" ", queue_name, null, message.getbytes ()); system.out.println (" [x] "+ message +); "'");} // Fermer Channel and Resource channel.close (); connection.close ();}}Récepteur:
Work.java
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.queueingConsumer; travail de classe publique {// quote name private static string queue_name = "workQueue"; public STATIC VOID Main (string string string queue_name = "workQueue"; public STATIC VOID Main (string string static queue_name = "workQueue"; public STATIC VOID Main (String string static Queue_name = " java.io.ioException, java.lang.interruptedException {// distinguer les différents processus de travailleurs int hashcode = work.class.hashcode (); // créer la connexion et la connexion de la chaîne Factory = new ConnectionFactory (); factory.sethost ("localHost"); connection connection = factory.NewConnection (); channel = connection. la file d'attente Channel.QueuedEClare (queue_name, false, false, false, null); System.out.println (hashcode + "[*] en attente de messages. Pour quitter la presse ctrl + c"); queueingConsumer Consumer = New QueueingConsumer (canal); // spécifier le canal de la file d'attente. tandis que (true) {queueingConsumer.Delivery Delivery = Consumer.NextDelivery (); String Message = new String (Delivery.getBody ()); System.out.println (HashCode + "[x] a reçu '" + Message + "'"); Dowork (message); System.out.Println (hashcode + "[ @param tâche * @throws interruptedException * / private static void Dowork (String task) lève InterruptedException {for (char ch: task.tocharArray ()) {if (ch == '.') thread.sleep (1000);}}} Round-Robin Forwarding
L'avantage de l'utilisation des files d'attente de tâches est qu'ils peuvent facilement fonctionner en parallèle. Si nous avons beaucoup d'arriéré de travail, nous pouvons résoudre le problème en ajoutant plus de travailleurs, ce qui rend le système plus évolutif.
Ensuite, nous allons d'abord exécuter 3 instances de travailleurs (work.java), puis exécuterons newtask.java. Les trois instances des travailleurs recevront des informations. Mais comment allouer? Regardons le résultat de la sortie:
[x] Envoyé 'Helloworld.1' [x] a envoyé 'Helloworld..2' [x] envoyé 'Helloworld ... 3' [x] a envoyé 'Helloworld ...... 4' [x] envoyé 'Helloworld ...... 5' [x] envoyé 'Helloworld ...... 6' [x] a envoyé 'Helloworld ....... 7' [x] envoyé 'Helloworld ....... 8' '[x] a envoyé' Helloworld ....... 7 '[x] envoyé' Helloworld ....... 8 '' [x] a envoyé 'Helloworld ....... 7' [x] envoyé 'Helloworld ....... 8' '[X] Send' Helloworld ....... 7 '[x] Envoyé' Helloworld ....... 8 ' 'Helloworld .......... 10' travailleur 1: 605645 [*] en attente de messages. Pour quitter la presse ctrl + c605645 [x] a reçu 'Helloworld.1'605645 [x] Done605645 [x] a reçu' Helloworld .... 4'605645 [x] Done605645 [x] a reçu 'Helloworld ......... 7'605645 [x] Done605645 [x] 'Helloworld .......... 10'605645 [x] Done Worker 2: 18019860 [*] en attente de messages. Pour quitter la presse ctrl + c18019860 [x] a reçu 'Helloworld..2'18019860 [x] Done18019860 [x] reçue' Helloworld ..... 5'18019860 [x] Done18019860 [x] a reçu 'Helloworld ......... 8'18019860 [x] Dot Worker 3: 180198660 [*] En attente de messages. Pour sortir appuyez sur Ctrl + C18019860 [x] a reçu 'Helloworld ... 3'18019860 [x] Done18019860 [x] reçue' Helloworld ...... 6'18019860 [x] Done18019860 [x] reçue 'Helloworld ...... 9'18019860 [x]
Comme vous pouvez le voir, par défaut, RabbitMQ enverra des informations au prochain consommateur un par un, quelle que soit la durée de chaque tâche, etc., et c'est une allocation unique, pas une allocation un par un. En moyenne, chaque consommateur recevra une quantité égale d'informations. Cette façon de distribuer des messages s'appelle la ronde.
2. Messageac Knowledgments
Il faut plusieurs secondes pour effectuer une tâche. Vous pouvez vous inquiéter des interruptions lorsqu'un travailleur effectue une tâche. Dans notre code ci-dessus, une fois que RabbitMQ transmet un message au consommateur, il supprimera immédiatement ces informations de la mémoire. Dans ce cas, si l'un des travailleurs qui effectuent la tâche est tué, nous perdrons les informations qu'elle traite. Nous perdrons également des messages qui ont été transmis à ce travailleur et il n'a pas encore été exécuté.
Dans l'exemple ci-dessus, nous commençons d'abord deux tâches, puis exécutons le code qui envoie la tâche (newtask.java), puis fermez immédiatement la deuxième tâche, et le résultat est:
Worker 2: 31054905 [*] WaitingFormations.ToExitPressCtrl + C 31054905 [x] Reçu'Helloworld..2 '31054905 [x] Done 31054905 [x] Reçu'Helloworld .... 4' Worker 1: 18019860 [*] 18019860) 18019860 [x] fait 18019860 [x] Reçu'Helloworld ......... 9 '18019860 [x] fait
On peut voir que le deuxième travailleur a perdu au moins les tâches 6, 8 et 10, et que la tâche 4 n'a pas été terminée.
Cependant, nous ne voulons pas perdre de tâches (informations). Lorsqu'un travailleur (bénéficiaire) est tué, nous voulons transmettre la tâche à un autre travailleur.
Pour s'assurer que les messages ne seront jamais perdus, RabbitMQ prend en charge les remerciements du message. Le consommateur envoie une réponse à RabbitMQ, lui indiquant que les informations avaient été reçues et traitées, puis RabbitMQ peut supprimer librement les informations.
Si le consommateur est tué sans envoyer de réponse, RabbitMQ supposera que les informations n'ont pas été complètement traitées et seront redirigées vers un autre consommateur. De cette façon, vous pouvez confirmer que les informations ne sont pas perdues, même si le consommateur est parfois tué.
Ce mécanisme ne signifie pas que le délai d'expiration n'est pas le cas. RabbitMQ ne repensé ces informations que lorsque la connexion du consommateur est déconnectée. Il est autorisé si le consommateur demande particulièrement longtemps pour traiter un message.
La réponse du message est allumée par défaut. Dans le code ci-dessus, nous désactivons ce mécanisme en définissant Autoask = true comme indiqué. Modifions le code (work.java):
booléen ack = false; // Ouvrez le canal du mécanisme de réponse.basicconsume (queue_name, ack, consommateur); // De plus, vous devez envoyer une réponse manuellement une fois que chaque traitement a terminé un message. channel.basicack (Delivery.getEnvelope (). GetDeliveryTag (), false);
Travail complètement modifié.java
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.queueingConsumer; travail de classe publique {// quote name private static string queue_name = "workQueue"; public STATIC VOID Main (string string string queue_name = "workQueue"; public STATIC VOID Main (string string static queue_name = "workQueue"; public STATIC VOID Main (String string static Queue_name = " java.io.ioException, java.lang.interruptedException {// distinguer les différents processus de travailleurs int hashcode = work.class.hashcode (); // créer la connexion et la connexion de la chaîne Factory = new ConnectionFactory (); factory.sethost ("localHost"); connection connection = factory.NewConnection (); channel = connection. La file d'attente Channel.QueueDeclare (queue_name, false, false, false, null); System.out.println (hashcode + "[*] en attente de messages. channel.basicconsume (queue_name, ack, consommateur); while (true) {queueingConsumer.Delivery Delivery = Consumer.NextDelivery (); String Message = new String (Delivery.getBody ()); System.out.println (HashCode + "[x] a reçu '" + message + "'"); Dowork (message); System.out.println (HashCode + "[x] fait"); / channel.basicack (Delivery.getEnvelope (). GetDeliveryTag (), false);}}}test:
Nous modifions le nombre de messages en 5, puis ouvrons deux consommateurs (work.java), puis envoyons une tâche (Newtask.java), fermons immédiatement un consommateur et observons la sortie:
[x] Send'Helloworld..2 '[x] Send'Helloworld ... 3' [x] Send'Helloworld ... 4 '[x] Send'Helloworld ...... 5' Worker2 18019860 [*] WaitingForesssages.toExitpressctrl + C 18019860 [x] HELOLKORLD..2 '18019860 [x] 18019860) 31054905 [x] Reçu'Helloworld .... 5 '31054905 [x] fait 31054905 [x] Reçu'Helloworld .... 4' 31054905 [x] fait
Vous pouvez voir que la tâche 4 qui n'a pas été terminée par Worker 2 est rediffusée au travailleur 1 pour l'achèvement.
3. Durabilité envoyée
Nous avons appris que même si les consommateurs sont tués, le message ne sera pas perdu. Mais si le service RabbitMQ est arrêté pour le moment, nos messages seront toujours perdus.
Lorsque RabbitMQ sort ou sort anormalement, toutes les files d'attente et informations seront perdues à moins que vous ne lui disiez de ne pas la perdre. Nous devons faire deux choses pour nous assurer que les informations ne sont pas perdues: nous devons définir des drapeaux persistants pour toutes les files d'attente et les messages.
Tout d'abord, nous devons confirmer que RabbitMQ ne perdra jamais notre file d'attente. Pour ce faire, nous devons le déclarer comme persistant.
BooleAndurable = true;
Channel.QueuedEClare ("Task_queue", durable, faux, faux, null);
Remarque: RabbitMQ ne permet pas de redéfinir une file d'attente avec différents paramètres, nous ne pouvons donc pas modifier les attributs de la file d'attente qui existe déjà.
Deuxièmement, nous devons identifier nos informations comme persistantes. Définissez la valeur MessageProperties (implémenteBasicProperties) sur persistant_text_plain.
channel.basicPublish ("", "task_queue", messageProperties.persistent_text_plain, message.getBytes ());
Vous pouvez maintenant exécuter un programme qui envoie des messages, puis fermer le service, redémarrer le service et exécuter le programme de consommation pour faire l'expérience.
4. Fairdispatch
Peut-être que nous constatons que le mécanisme de transfert de message actuel (tournoi à la ronde) n'est pas ce que nous voulons. Par exemple, dans ce cas, pour deux consommateurs, il existe une série de tâches, les tâches étranges sont particulièrement longues, tandis que même les tâches sont faciles, ce qui fait entendre un consommateur tandis que l'autre consommateur termine rapidement la tâche et attendre après sa fin.
La raison en est que RabbitMQ ne transmet que les messages lorsque le message arrive dans la file d'attente. Ne vous souciez pas du nombre de tâches que le consommateur n'a pas livré de réponse à Rabbitmq. Il suffit de transmettre aveuglément tous les nombres impairs à un consommateur et même des nombres à un autre consommateur.
Pour résoudre ce problème, nous pouvons utiliser la méthode BasicQOS, en passant le paramètre comme préfetchCount = 1. Cela dit à Rabbitmq de ne pas donner plus d'un message à un consommateur en même temps. En d'autres termes, le message suivant ne sera envoyé que lorsque le consommateur est inactif.
int prefetchCount = 1; channel.basicqos (prefetchCount);
Remarque: Si tous les travailleurs sont occupés, votre file d'attente peut être remplie. Vous pouvez observer l'utilisation des files d'attente, puis ajouter des travailleurs ou utiliser une autre stratégie.
Test: modifiez le code pour envoyer un message, modifiez le numéro de fin des points en 6-2, puis démarrez d'abord les deux travailleurs, puis envoyez le message:
[x] envoyé 'Helloworld ...... 6' [x] envoyé 'Helloworld ..... 5' [x] a envoyé 'Helloworld .... 4' [x] a envoyé 'Helloworld ... 3' [x] a envoyé 'Helloworld..2' Worker 1: 18019860 [*] en attente de messages. Pour quitter, appuyez sur Ctrl + C18019860 [x] a reçu 'Helloworld ...... 6'18019860 [x] Done18019860 [x] a reçu' Helloworld ... 3'18019860 [x] Dot Worker 2: 31054905 [*] en attente de messages. Pour sortir, appuyez sur Ctrl + C31054905 [x] a reçu 'Helloworld ...... 5'31054905 [x] Done31054905 [x] a reçu' Helloworld ...... 4'31054905 [x] fait
On peut voir que le message n'a pas été transmis en fonction du mécanisme de la ronde précédente à ce moment, mais a été transmis lorsque le consommateur n'était pas occupé. De plus, dans le cadre de ce modèle, les consommateurs sont supportés pour augmenter dynamiquement parce que le message n'est pas envoyé et l'augmentation dynamique augmente immédiatement. Le mécanisme de transfert par défaut provoquera, même si le consommateur est ajouté dynamiquement, le message a été alloué et ne peut pas être ajouté immédiatement, même s'il existe de nombreuses tâches inachevées.
5. Code complet
Newtask.java
Importer java.io.ioException; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.messageproperties; public class NewTask {//ue name privée finale statique statique queue_name = " static void main (String [] args) lève ioException {// créer la connexion et la connexion canal ConnectionFactory factory = new ConnectionFactory (); factory.sethost ("localhost"); connexion connection = factory.newconnection (); canal channel = connection.createChannel (); // Declare the Queue Boolean Durable = true; // 1. Channel.QueuedEClare (queue_name, durable, false, false, null); // Envoi 10 messages, apprenant 1-10 points après le message à son tour pour (int i = 5; i> 0; i--) {String Dots = ""; pour (int j = 0; j <= i; j ++) {DOTS + = ". MessageProperties 2. Définir le message Persistance channel.basicPublish ("", queue_name, messageProperties.persistent_text_plain, message.getbytes ()); System.out.println ("[x] envoyé '" + message + "'");} // close canal et ressource channel.close (); connection.close ();}}}Work.java
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.queueingConsumer; travail de classe publique {// nom de file java.io.ioException, java.lang.interruptedException {// différenciez la sortie de différents processus de travailleur int hashcode = work.class.hashcode (); // créer la connexion et la connexion canal Déclare la file d'attente booléenne durable = true; channel.queueDeclare (queue_name, durable, false, false, null); System.out.println (hashcode + "[*] en attente de messages. = Nouveau queueingConsumer (canal); // Spécifiez la file d'attente de consommation boolean ack = false; // activer le canal du mécanisme de réponse.basicconsume (queue_name, ack, consommateur); while (true) {queueingConsumer.Delivery Delivery = Consumer.NextDelivery (); String Message = new String (Delivery.getBody ()); System.out.println (HashCode + "[x] reçu '" + Message + "'"); Dowork (message); System.out.println (HashCode + "[x] Fait "); // canal task.tocharArray ()) {if (ch == '.') thread.sleep (1000);}}}Résumer
Ce qui précède est toute l'explication détaillée du code de file d'attente de travail Java Dans cet article, j'espère que ce sera utile à tout le monde. S'il y a des lacunes, veuillez laisser un message pour le signaler. Merci vos amis pour votre soutien pour ce site!