Wir haben über das Senden und Empfangen von Nachrichten über eine benannte Warteschlange geschrieben. Wenn Sie es noch nicht wissen, klicken Sie bitte: Erste Schritte mit Rabbitmq Java. In diesem Artikel werden wir eine Arbeitswarteschlange erstellen, um zeitaufwändige Aufgaben unter den Verbrauchern zu verteilen.
Die Hauptaufgabe einer Arbeitswarteschlange besteht darin, sofort ressourcenintensive Aufgaben auszuführen und dann zu warten, bis sie abgeschlossen sind. Stattdessen führen wir Aufgabenplanung durch: Wir verkapseln die Aufgabe als Nachricht und senden sie an die Warteschlange. Die Arbeit wird im Hintergrund ausgeführt und entfernt ständig Aufgaben aus der Warteschlange und führt sie dann aus. Wenn Sie mehrere Arbeiterprozesse ausführen, werden Aufgaben in der Task -Warteschlange vom Arbeitsprozess geteilt.
Ein solches Konzept ist in Webanwendungen äußerst nützlich, wenn komplexe Aufgaben zwischen sehr kurzen HTTP -Anforderungen ausgeführt werden müssen.
1. Bereiten Sie sich vor
Wir verwenden Thread.sleep, um zeitaufwändige Aufgaben zu simulieren. Wir fügen eine bestimmte Anzahl von Punkten am Ende der Nachricht hinzu, die an die Warteschlange gesendet wurde. Jeder Punkt bedeutet, dass es 1 Sekunde im Arbeiter -Thread dauert, z. B. Hallo ... muss 3 Sekunden warten.
Absender:
Newtask.java
importieren java.io.ioxception; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; com.rabbitmq.client.ConnectionFactory; öffentliche Klasse newtask {// Queue name private statische String -Queue_name = "öffentlich"; und Channel ConnectionFactory Factory = new ConnectionFactory (); factory.sethost ("localhost"); Verbindung Connection = Factory.newConnection (); Channel Channel = Connection.Createchannel (); // Deklarieren Sie den Warteschlangenkanal. 10; "'");} // Kanal- und Ressourcenkanal schließen.Empfänger:
Arbeit.java
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; java.io.ioxception, java.lang.interruptedException {// Unterscheidet zwischen verschiedenen Arbeitsprozessen int HashCode = work.class.hashcode (); // Verbindungen und Kanal ConnectionFactory Factory = new ConnectionFactory (); factory.sethost ("Localhost"; Der Warteschlangenkanal. while (true) {queueingconsumer.delivery lieferung = Verbraucher. * @param task * @throws interruptedException */private static void dowork (String Task) löst InterruptedException aus {für (char ch: task.toarArray ()) {if (ch == '.') thread.sleep (1000);}}}}}}}}}}}} Round-Robin-Weiterleitung
Der Vorteil der Verwendung von Task -Warteschlangen besteht darin, dass sie leicht parallel arbeiten können. Wenn wir viel Arbeitskräfte haben, können wir das Problem lösen, indem wir mehr Arbeitnehmer hinzufügen, wodurch das System skalierbarer wird.
Als nächstes werden wir zuerst 3 Arbeiter (work.java) Instanzen durchführen und dann Newtask.java ausführen. Die drei Arbeiterinstanzen erhalten Informationen. Aber wie kann man zuordnen? Schauen wir uns das Ausgabeergebnis an:
[x] Sent 'helloworld.1'[x] Sent 'helloworld..2'[x] Sent 'helloworld...3'[x] Sent 'helloworld......4'[x] Sent 'helloworld......5'[x] Sent 'helloworld......6'[x] Sent 'helloworld.......7'[x] Sent 'helloworld.......8'[x] Sent 'helloworld.........9'[x] Sent 'HelloWorld .......... 10' Arbeiter 1: 605645 [*] Warten auf Nachrichten. Drücken Sie Press Strg+C605645 [x] erhalten 'helloWorld.1'605645 [x] DOTE605645 [X] erhalten' helloWorld .... 4'605645 [x] DORE605645 [x] erhielt 'helloworld ......... 7'605645 [x] teurte605645 [x] teurte605645 [x] teurte605645 [x] teurte605645 [x] teurte605645 [x] teurte 605645 [x] teurte605645 [x] teurte605645 [x] teur ap] teen 605645 [x] teur ap] 'HelloWorld .......... 10'605645 [x] Drege Worker 2: 18019860 [*] Warten auf Nachrichten. To exit press CTRL+C18019860 [x] Received 'helloworld..2'18019860 [x] Done18019860 [x] Received 'helloworld.....5'18019860 [x] Done18019860 [x] Received 'helloworld.........8'18019860 [x] Done Worker 3: 18019860 [*] Warten auf Nachrichten. To exit press CTRL+C18019860 [x] Received 'helloworld...3'18019860 [x] Done18019860 [x] Received 'helloworld......6'18019860 [x] Done18019860 [x] Received 'helloworld......9'18019860 [x] Done
Wie Sie sehen können, sendet Rabbitmq standardmäßig Informationen nacheinander an den nächsten Verbraucher, unabhängig von der Dauer jeder Aufgabe usw., und es handelt sich um eine einmalige Zuteilung, nicht eine Einszene. Im Durchschnitt erhält jeder Verbraucher eine gleiche Menge an Informationen. Diese Art der Verteilung von Nachrichten heißt Round-Robin.
2. MessageAcKningdgments
Es dauert mehrere Sekunden, um eine Aufgabe auszuführen. Möglicherweise machen Sie sich Sorgen um Unterbrechungen, wenn ein Arbeiter eine Aufgabe ausführt. In unserem obigen CODE wird Rabbitmq, sobald er eine Nachricht an den Verbraucher abgibt, diese Informationen sofort aus dem Speicher entfernt. In diesem Fall werden wir die von ihm verarbeitenden Informationen verlieren, wenn einer der Arbeitnehmer, die die Aufgabe ausführen, getötet wird. Wir werden auch Nachrichten verlieren, die an diesen Arbeiter weitergeleitet wurden, und es wurde noch nicht ausgeführt.
Im obigen Beispiel starten wir zunächst zwei Aufgaben und führen dann den Code aus, der die Aufgabe sendet (newtask.java), und schließen dann sofort die zweite Aufgabe, und das Ergebnis ist:
Arbeiter 2: 31054905 [*] WaitingFormessages.toexitpressCtrl+C 31054905 [x] erhielt 'Helloworld..2' 31054905 [x] Done 31054905 [x] erhält'Helloworld .... 4 'Arbeiter 1: 18019860 [] WaitingFormess..toexitress.toexitress.toexitress..toexitress..toexitress..toexitressSages.toexitress..toexitress..toexitress..toexitress..toexitress..toexitress..toexitress..toexitress..toexitress..toexitress..toexitressSages. 18019860 [x] erhielt'helloworld.1 '18019860 [x] Done 18019860 [x] erhielt' Helloworld ... 3 '18019860 [x] fertig 18019860 [x] erhielt' HaBLEWALLLLD ......... 5 '18019860 [x] DURDE 18019860 [x]. 18019860 [x] DURTE 18019860 [X] erhielt 'Helloworld ......... 9' 18019860 [x] fertig
Es ist ersichtlich, dass der zweite Arbeiter mindestens die Aufgaben 6, 8 und 10 verlor und Aufgabe 4 nicht abgeschlossen war.
Wir möchten jedoch keine Aufgaben (Informationen) verlieren. Wenn ein Arbeiter (Empfänger) getötet wird, möchten wir die Aufgabe an einen anderen Arbeiter weitergeben.
Um sicherzustellen, dass Nachrichten niemals verloren gehen, unterstützt RabbitMQ Nachrichtenbestätigungen. Der Verbraucher sendet eine Antwort an Rabbitmq und teilt mit, dass die Informationen empfangen und verarbeitet wurden, und dann kann Rabbitmq die Informationen frei löschen.
Wenn der Verbraucher ohne Antwort getötet wird, wird Rabbitmq davon ausgehen, dass die Informationen nicht vollständig verarbeitet wurden und zu einem anderen Verbraucher weitergeleitet werden. Auf diese Weise können Sie bestätigen, dass die Informationen nicht verloren gehen, selbst wenn der Verbraucher gelegentlich getötet wird.
Dieser Mechanismus bedeutet nicht, dass die Zeitüberschreitung nicht der Fall ist. Rabbitmq repariert diese Informationen nur wieder, wenn die Verbraucherverbindung getrennt ist. Es ist erlaubt, wenn der Verbraucher besonders lange dauert, um eine Nachricht zu verarbeiten.
Die Nachrichtenantwort ist standardmäßig eingeschaltet. Im obigen Code schalten wir diesen Mechanismus aus, indem wir autoask = true wie gezeigt einstellen. Ändern wir den Code (work.java):
boolean ack = false; // den Antwortmechanismus -Kanal öffnen. // Zusätzlich müssen Sie nach Abschluss einer Nachricht eine Antwort manuell senden. Channel.Basicack (Delivery.GetEnvelope (). getDeliveryTag (), false);
Vollständig modifizierte Arbeit.java
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; java.io.ioxception, java.lang.interruptedException {// Unterscheidet zwischen verschiedenen Arbeitsprozessen int HashCode = work.class.hashcode (); // Verbindungen und Kanal ConnectionFactory Factory = new ConnectionFactory (); factory.sethost ("Localhost"; der Warteschlangenkanal.queeeclare (Queue_Name, Falsch, Falsch, Falsch, Null); System.out.println (HashCode + "[*] auf Nachrichten warten. Drücken Sie Strg + c"); Queugingconsumer Consumer = New Queueingconsumer (Kanal). Channel.BasicConsume (queue_name, ack, Verbraucher); while (true) {queueeingconsumer.delivery lieferung = Verbraucher.NextDelivery (); String message = new String (Delivery. Channel.Basicack (Lieferungprüfen:
Wir ändern die Anzahl der Nachrichten in 5, dann zwei Verbraucher (Arbeit.java), dann eine Aufgabe (newtask.java), schließen Sie sofort einen Verbraucher und beobachten die Ausgabe:
[x] sent'helloworld..2 '[x] sent'helloworld ... 3' [x] sent'helloworld ... 4 '[x] sent'helloworld ...... 5' Worker2 18019860 [*] WaitingFormessages.toexitpressCtrl+c 18019860 [x] HaBLEHORD'HELD. 18019860 [x] empfangen'Helloworld .... 4 'Worker1 31054905 [*] WaitingFormessages.toexitpressCtrl+C 31054905 [x] empfangen'Helloworld.1' 31054905 [x] Drede 31054905 [x] erhielt. 31054905 [x] empfangen'Helloworld .... 5 '31054905 [x] Done 31054905 [x] empfangene Helloworld .... 4' 31054905 [x] fertig
Sie können sehen, dass die Aufgabe 4, die von Worker 2 nicht abgeschlossen wurde, zur Fertigstellung an Worker 1 erneut vorgeschrieben ist.
3. Messagierte Haltbarkeit
Wir haben erfahren, dass die Botschaft nicht verloren geht, selbst wenn Verbraucher getötet werden. Aber wenn der Rabbitmq -Dienst zu diesem Zeitpunkt gestoppt wird, gehen unsere Nachrichten weiterhin verloren.
Wenn Rabbitmq abnormal ausgeht oder abnormal ist, gehen alle Warteschlangen und Informationen verloren, es sei denn, Sie sagen, dass Sie es nicht verlieren sollen. Wir müssen zwei Dinge tun, um sicherzustellen, dass die Informationen nicht verloren gehen: Wir müssen anhaltende Flags für alle Warteschlangen und Nachrichten festlegen.
Zunächst müssen wir bestätigen, dass Rabbitmq niemals unsere Warteschlange verlieren wird. Um dies zu tun, müssen wir es als anhaltend anmelden.
Booleandurable = True;
Channel.QueueEclare ("Task_queue", langlebig, falsch, falsch, null);
Hinweis: RabbitMQ erlaubt nicht zu, eine Warteschlange mit unterschiedlichen Parametern neu zu definieren, sodass wir die Attribute der bereits vorhandenen Warteschlange nicht ändern können.
Zweitens müssen wir unsere Informationen als anhaltend identifizieren. Setzen Sie den Wert von MessageProperties (implementsBasicProperties) auf persistent_text_plain.
Channel.BasicPublish ("", "Task_queue", MessageProperties.persistent_text_plain, message.getBytes ());
Jetzt können Sie ein Programm ausführen, das Nachrichten sendet, dann den Dienst schließen, den Dienst neu starten und das Verbraucherprogramm ausführen, um das Experiment durchzuführen.
4. FairDispatch
Vielleicht werden wir feststellen, dass der aktuelle Mechanismus zur Weiterleitung (Round-Robin) nicht das ist, was wir wollen. In diesem Fall gibt es bei zwei Verbrauchern eine Reihe von Aufgaben, ungerade Aufgaben sind besonders zeitaufwändig, während sogar Aufgaben einfach sind, was dazu führt, dass ein Verbraucher beschäftigt ist, während der andere Verbraucher die Aufgabe schnell erledigt und nach dem Abschluss schnell wartet.
Der Grund dafür ist, dass Rabbitmq nur Nachrichten weiterleitet, wenn die Nachricht in der Warteschlange eintrifft. Es ist mir egal, wie viele Aufgaben der Verbraucher keine Antwort auf Rabbitmq geliefert hat. Leiten Sie einfach blind alle seltsamen Zahlen an einen Verbraucher und sogar Zahlen an einen anderen Verbraucher weiter.
Um dieses Problem zu lösen, können wir die BasicQOS -Methode verwenden und den Parameter als PrefetchCount = 1 übergeben. Dies sagt Rabbitmq, dass er einem Verbraucher gleichzeitig nicht mehr als eine Nachricht anbietet. Mit anderen Worten, die nächste Nachricht wird nur gesendet, wenn der Verbraucher untätig ist.
int prefetchCount = 1; Channel.Basicqos (PrefetchCount);
Hinweis: Wenn alle Arbeiter beschäftigt sind, kann Ihre Warteschlange gefüllt sein. Sie können die Warteschlangennutzung beobachten und dann Arbeiter hinzufügen oder eine andere Strategie anwenden.
Testen: Ändern Sie den Code, um eine Nachricht zu senden, die Endnummer der Punkte auf 6-2 zu ändern, dann starten Sie die beiden Arbeiter zuerst und senden Sie dann die Nachricht:
[x] gesendet 'helloWorld ...... 6' [x] gesendet 'helloWorld ..... 5' [x] gesendet 'helloWorld .... 4' [x] gesendet 'helloWorld ... 3' [x] gesendet 'helloWorld..2' Arbeiter 1: 18019860 [*] auf Nachrichten wartet. Um zu beenden, drücken Sie Strg+C18019860 [X] erhalten 'HelloWorld ...... 6'18019860 [x] DET18019860 [x] erhielt' HelloWorld ... 3'18019860 [x] Dete Worker 2: 31054905 [*] wartete auf Nachrichten. Zum Beenden drücken Strg+C31054905 [x] erhalten 'HelloWorld ...... 5'31054905 [x] DOT31054905 [x] erhielt' HelloWorld ...... 4'31054905 [x] fertig
Es ist zu erkennen, dass die Nachricht zu diesem Zeitpunkt nicht nach dem vorherigen Round-Robin-Mechanismus weitergeleitet wurde, sondern weitergeleitet wurde, als der Verbraucher nicht beschäftigt war. Darüber hinaus wird nach diesem Modell die Verbraucher dabei unterstützt, dynamisch zu erhöhen, da die Nachricht nicht versandt wird und der dynamische Anstieg sofort erhöht wird. Der Standard -Weiterleitungsmechanismus verursacht, selbst wenn der Verbraucher dynamisch hinzugefügt wird, wurde die Nachricht zugewiesen und kann nicht sofort hinzugefügt werden, selbst wenn es viele unvollendete Aufgaben gibt.
5. CODE CODE
Newtask.java
importieren java.io.ioxception; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; com.rabbitmq.client.ConnectionFactory; com.rabbitmq.client.messageProperties; Public Class Newtask {// // // Queue -Namen private static cree_name "WorkQueue_Persistence"; public static void main (String [] args) löst ioException {// Create Connection and Channel Connectory Factory Factory = new ConnectionFactory (); factory.Sethost ("Localhost"); Anschlussverbindung = factory.NewConnection (). Persistenzkanal dots.length (); // messageProperties 2. Set Message Persistence Channel. Channel.CLOSE (); connection.close ();}}Arbeit.java
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; löst Java.io.ioException, java.lang.InterruptedException aus {// Differenzieren Sie die Ausgabe verschiedener Arbeitsprozesse int HashCode = Work.class.hashcode (); // Verbindungs- und Kanalverbindung erstellen. Connection.CreateChannel (); // deklarieren Sie die Warteschlange boolean dauerhaft = true; Channel. 1; Channel.Basicqos (PrefetchCount); QueueeingConsumer Consumer = New Queueeingconsumer (Kanal); // Geben Sie den Konsum -Queue boolean ck = false; // den Antwortmechanismus -Kanal ein. while (true) {queueeingconsumer.delivery lieferung = Consumer. Done "); // Channel.Basicack (Delivery.GetEnvelope (). GetDeliveryTag (), false); Channel.Basicack (Lieferung CH: Task.toarArray ()) {if (ch == '.') thread.sleep (1000);}}}Zusammenfassen
Das obige ist die detaillierte Erklärung des Java Work -Warteschlangencodes in diesem Artikel. Ich hoffe, es wird für alle hilfreich sein. Wenn es Mängel gibt, hinterlassen Sie bitte eine Nachricht, um darauf hinzuweisen. Vielen Dank an Freunde für Ihre Unterstützung für diese Seite!