Мы написали о отправке и получении сообщений через названную очередь. Если вы еще не знаете, пожалуйста, нажмите: Начало работы с Rabbitmq Java. В этой статье мы создадим рабочую очередь для распространения трудоемких задач среди потребителей.
Основная задача рабочей очереди состоит в том, чтобы избежать немедленного выполнения ресурсных задач, а затем нужно ждать их завершения. Вместо этого мы выполняем планирование задач: мы инкапсулируем задачу как сообщение и отправляем ее в очередь. Работа выполняется в заднем плане и постоянно удаляет задачи из очереди, а затем выполняет их. Когда вы запускаете несколько рабочих процессов, задачи в очереди задач будут переданы работникам.
Такая концепция чрезвычайно полезна в веб -приложениях, когда необходимо выполнять сложные задачи между очень короткими HTTP -запросами.
1. Подготовьте
Мы используем Thread.Sele для имитации трудоемких задач. Мы добавляем определенное количество баллов в конце сообщения, отправленного в очередь, каждая точка означает, что в потоке работника требуется 1 секунду, например, Hello ... нужно подождать 3 секунды.
Отправитель:
Newtask.java
Импорт java.io.ioexception; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; public class newtas ConnectionFactory factory = new ConnectionFactory (); factory.Sethost ("localhost"); Connection Connection = factory.newConnection (); канал канал = connection.createChannel (); // Объявляет канал очереди. {String dots = ""; for (int j = 0; j <= i; j ++) {dots += ".";} String message = "helloworld" +dots +dots.length (); cannel.basicpublish ("", queue_name, null, message.getbytes ()); "''");} // Закройте канал и канал ресурса.Приемник:
Work.java
Импорт com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; импорт com.rabbitmq.client.ceueingconsumer; public class work {// Quote private queue Queue_name = " java.io.ioexception, java.lang.ErenterteDexception {// различать различные рабочие процессы int hashcode = work.class.hashcode (); // Создание соединения и соединения Connection Factory = new ConnectionFactory (); factory.sethost ("localhost"); connection = factory.newConection (); cannel.; cnectore chancee chane chaneecheeme chane chaneecheecheemeech =; Queue Channel.queuedeclare (queue_name, false, false, false, null); system.out.println (hashcode + "[*] В ожидании сообщений. Чтобы выйти из пресса ctrl + c"); queueingConsumer untiver = new QueueingConsumer (канал); // определить Queue -Queuemer. while (true) {queueingConsumer.delivery Delivery = consumer.NextDelivery (); String Message = new String (Deliver.getBody ()); System.out.println (hashcode + "[x] получил" + "Сообщение +" '"); Dowork (сообщение); System.out.println (hashcode +" [x]; * @param Task * @Throws прерывание. Переадресация раунда
Преимущество использования очередей задач заключается в том, что они могут легко работать параллельно. Если у нас много работы на работе, мы можем решить проблему, добавив больше работников, сделав систему более масштабируемой.
Затем мы сначала запустим 3 экземпляры работников (work.java), а затем запустим newtask.java. Три экземпляра работников получат информацию. Но как распределить? Давайте посмотрим на результат вывода:
[x] Sentle 'helloworld.1' [x] Sentle 'helloworld..2' [x] Sedi alloworld ... 3 '[x] Sent' Helloworld ...... 4 '[x] Sent' Helloworld ...... 5 '[x] Sent' Helloworld ...... 6 " 'Helloworld .......... 10' работник 1: 605645 [*] В ожидании сообщений. Чтобы выйти из прессы Ctrl+C605645 [x] получил «helloworld.1'605645 [x] Dod605645 [x] 'Helloworld .......... 10'605645 [x] Готово работник 2: 18019860 [*] В ожидании сообщений. Выйти из прессы Ctrl+C18019860 [x] получил «helloworld..2'18019860 [x] DEND18019860 [x] получил« Helloworld ..... 5'18019860 [x] DEND18019860 [x] получил Helloworld ....... 8'18019860 [X]. [*] В ожидании сообщений. Выйти из прессы Ctrl+C18019860 [x] получил «helloworld ... 3'18019860 [x] DENE18019860 [x] получил« Helloworld ...... 6'18019860 [x] DEND18019860
Как вы можете видеть, по умолчанию Rabbitmq отправит информацию следующему потребительскому, независимо от продолжительности каждой задачи и т. Д., И это одноразовое распределение, а не одно из одного распределения. В среднем каждый потребитель получит равное количество информации. Этот способ распределения сообщений называется круглым Robin.
2. MessageAcknowledgments
Для выполнения задачи требуется несколько секунд. Вы можете беспокоиться о перерывах, когда работник выполняет задачу. В нашем коде выше, как только Rabbitmq доставляет сообщение потребителю, он немедленно удалит эту информацию из памяти. В этом случае, если один из работников, выполняющих задачу, убит, мы потеряем информацию, которую она обрабатывает. Мы также потеряем сообщения, которые были отправлены этому работнику, и это еще не было выполнено.
В приведенном выше примере мы сначала запускаем две задачи, затем выполняем код, который отправляет задачу (newtask.java), а затем сразу же закрываем вторую задачу, и результат: результат:
Работник 2: 31054905 [*] waredformessages.toexitpressctrl+C 31054905 [x] получил «Helloworld..2 '31054905 [x] Готово 31054905 [x] 18019860 [x] получил «Helloworld.1» 18019860 [x] Готово 18019860 [x] Получен'Helloworld ... 3 '18019860 [x] Готово 18019860 [x] Получен'Helloworld ......... 5' 18019860 [x] Готово 18019860 [x]. 18019860 [x] Готово 18019860 [x] получил «Helloworld ......... 9 '18019860 [x] сделано
Видно, что второй работник потерял хотя бы задачи 6, 8 и 10, а задача 4 не была завершена.
Тем не менее, мы не хотим терять какие -либо задачи (информация). Когда работник (получатель) убит, мы хотим передать задачу другому работнику.
Чтобы убедиться, что сообщения никогда не будут потеряны, Rabbitmq поддерживает подтверждения сообщения. Потребитель отправляет ответ Rabbitmq, сообщая, что информация была получена и обработана, а затем Rabbitmq может свободно удалять информацию.
Если потребитель убит без отправки ответа, RabbitMQ предположит, что информация не была обработана полностью и будет перенаправлена другому потребителю. Таким образом, вы можете подтвердить, что информация не потеряна, даже если потребитель иногда убит.
Этот механизм не означает, что тайм -аут не так. RabbitMQ пересматривает эту информацию только тогда, когда потребительское соединение отключено. Это разрешено, если потребитель требуется особенно много времени для обработки сообщения.
Ответ сообщения включен по умолчанию. В приведенном выше коде мы отключаем этот механизм, установив AutoAsk = true, как показано. Давайте изменим код (work.java):
логический ack = false; // Откройте канал механизма ответа. BasicConsume (queue_name, ack, потребитель); // Кроме того, вам нужно вручную отправить ответ после того, как каждая обработка завершит сообщение. channel.basicack (delivery.getenvelope (). getDeliveryTag (), false);
Полностью модифицированная работа. Java
Импорт com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; импорт com.rabbitmq.client.ceueingconsumer; public class work {// Quote private queue Queue_name = " java.io.ioexception, java.lang.ErenterteDexception {// различать различные рабочие процессы int hashcode = work.class.hashcode (); // Создание соединения и соединения Connection Factory = new ConnectionFactory (); factory.sethost ("localhost"); connection = factory.newConection (); cannel.; cnectore chancee chane chaneecheeme chane chaneecheecheemeech =; Queue Channel.queuedeclare (queue_name, false, false, false, null); system.out.println (hashcode + "[*] В ожидании сообщений. Выход из нажима ctrl + c"); queueingConsumer untiver = new queueingConsum (канал); // Указание Quolean QuoLean ack = ack ack = ///secle ack ack ack = ///secle ack = ///semy ack ack = ///semysum ack ack = ///semismer ack ack = ///semismer; channel.basicconsume (queue_name, ack, потребитель); while (true) {queueingConsumer.delivery delivery = consumer.nextdelivery (); string message = new String (deliver.getbody ()); System.out.println (hashcode + "[x] получил" + message + "'"); dowork (сообщение); System.out.println (hashcode + " channel.basicack (deliver.getenvelope (). getDeliveryTag (), false);}}}тест:
Мы меняем количество сообщений на 5, затем открываем двух потребителей (work.java), затем отправляем задание (newtask.java), немедленно закрываем одного потребителя и наблюдаем за выходом:
[x] Send'helloworld..2 '[x] Send'helloworld ... 3' [x] Send'helloworld ... 4 '[x] Send'helloworld ...... 5' Worker2 18019860 [*] WALEDFORMESSESCESS.TOEXITPRESSCTRL+C 18019860 [x] 18019860 [x] получил «Hevelloworld .... 4 'Worker1 31054905 [*] wardingformessages.toexitpressctrl+C 31054905 [x] получил« helloworld.1' 31054905 [x] Готово 31054905 [x] 31054905 [x] Полученный «helloworld .... 5 '31054905 [x] Готово 31054905 [x]
Вы можете видеть, что задача 4, которая не завершилась работником 2, перезаписывается работнику 1 для завершения.
3. Сообщение за прочность
Мы узнали, что даже если потребители будут убиты, сообщение не будет потеряно. Но если в настоящее время служба RabbitMQ будет остановлена, наши сообщения все равно будут потеряны.
Когда Rabbitmq выходит из аномально, все очереди и информацию будут потеряны, если вы не скажете ему не терять. Нам нужно сделать две вещи, чтобы убедиться, что информация не потеряна: нам нужно установить постоянные флаги для всех очередей и сообщений.
Во -первых, нам нужно подтвердить, что Rabbitmq никогда не потеряет нашу очередь. Чтобы сделать это, мы должны объявить это как настойчивое.
BooleAndiverse = true;
channel.queuedeclare ("task_queue", долговечный, ложный, ложный, нулевый);
ПРИМЕЧАНИЕ. Rabbitmq не позволяет переопределять очередь с различными параметрами, поэтому мы не можем изменять атрибуты очереди, которая уже существует.
Во -вторых, нам нужно определить нашу информацию как постоянную. Установите значение MessageProperties (реализует BasicProperties) для Persistent_Text_Plain.
channel.basicpublish ("", "task_queue", messageproperties.persistent_text_plain, message.getbytes ());
Теперь вы можете выполнить программу, которая отправляет сообщения, затем закрывает сервис, перезапустите Сервис и запустит программу потребителей для проведения эксперимента.
4. FairDispatch
Возможно, мы обнаружим, что текущий механизм пересылки сообщений (круглый Robin)-это не то, что мы хотим. Например, в этом случае для двух потребителей существует ряд задач, нечетные задачи особенно трудоемкие, в то время как даже задачи просты, что заставляет одного потребителя быть занятым, в то время как другой потребитель быстро выполняет задачу и ждал его после завершения.
Причина этого в том, что RabbitMQ пересылает сообщения только тогда, когда сообщение прибывает в очередь. Не волнует, сколько задач потребитель не дал ответа Rabbitmq. Просто вслепую передайте все нечетные числа одному потребителю и даже числам другому потребителю.
Чтобы решить эту проблему, мы можем использовать метод BasicQOS, передавая параметр в качестве prefetchCount = 1. Это говорит Rabbitmq не давать более одного сообщения потребителю одновременно. Другими словами, следующее сообщение будет отправлено только тогда, когда потребитель простаивает.
int prefetchCount = 1; Channel.basicqos (prefetchCount);
Примечание. Если все работники заняты, ваша очередь может быть заполнена. Вы можете наблюдать за использованием очереди, а затем добавить работников или использовать какую -то другую стратегию.
Тест: Измените код, чтобы отправить сообщение, измените конечное количество очков на 6-2, затем сначала запустите двух работников, а затем отправьте сообщение:
[x] послал «helloworld ...... 6 '[x] послал' helloworld ..... 5 '[x] отправил' helloworld .... 4 '[x] Sent' Helloworld ... 3 '[x] отправил« Helloworld..2 »Работник 1: 18019860 [*] В ожидании сообщений. Чтобы выйти из прессы Ctrl+C18019860 [x] получил «helloworld ...... 6'18019860 [x] Done18019860 [x] получил 'Helloworld ... 3'18019860 [x] Dode Worker 2: 31054905 [*] В ожидании сообщений. Чтобы выйти из нажима Ctrl+C31054905 [x] получил «helloworld ...... 5'31054905 [x] DENE31054905 [X] получил« Helloworld ...... 4'31054905 [x]
Можно видеть, что в настоящее время сообщение не было направлено в соответствии с предыдущим механизмом раунда-робина, но было перенаправлено, когда потребитель не был занят. Кроме того, в соответствии с этой моделью потребители поддерживаются для динамического увеличения, поскольку сообщение не отправляется, а динамическое увеличение увеличивается немедленно. Механизм пересылки по умолчанию вызовет, даже если потребитель добавлен динамически, сообщение было выделено и не может быть добавлен немедленно, даже если есть много незаконченных задач.
5. Полный код
Newtask.java
Импорт java.io.ioexception; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; import com.rabbitmq.client.messageproperties; public class newsask {// Queue Private Final Stitic Queue_name; Статический void main (string [] args) бросает ioException {// Создание соединения и канала ConnectionFactory factory = new ConnectionFactory (); factory.sethost ("localhost"); connection connection = factory.newConnection (); канал канал = connection.createChannel (); // Объявит Queue Boolean Deversul channel.queuedeclare (queue_name, долговечный, false, false, null); // Отправлять 10 сообщений, добавление 1-10 точек после сообщения, в свою очередь (int i = 5; i> 0; i--) {String dots = ";"; для (int j = 0; j <= i; j ++) {dots += ". MessageProperties 2. Установите сообщение о постоянстве.Work.java
Импорт com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; импорт com.rabbitmq.client.queueingConsumer; Public Class Work {// queue name private Queue_name = " java.io.ioexception, java.lang.terrupruptexception {// дифференцировать выходные данные различных рабочих процессов int hashcode = work.class.hashcode (); // Создать соединение и соединение Connectory factory = new ConnectionFactory (); factory.sethost ("localHost"); Connection Connection = factory.newCORINE (); cannecteCeeCeeCeeCheate =; cnectAne ("localHost"); connection = factory.newConection (); queue boolean ould = true; cannel.queuedeclare (queue_name, долговечный, ложный, ложный, нулевый); system.out.println (hashcode + "[*] Ожидание сообщений. Чтобы выйти из прессы Ctrl + C"); // Установить максимальное количество перенаправленных сообщений для int prefetchcount = 1; = new queueingConsumer (канал); // указать очередь потребления логически ack = false; // Включить канал механизма ответа. while (true) {queueingConsumer.delivery Delivery = consumer.NextDelivery (); String message = new String (deliver.getBody ()); System.out.println (hashcode + "[x] получил" + message + "'"); dowork (сообщение); System.out.println (hashcode + "[x]; Выполнено "); // Channel.basicack (deliver.getenvelope (). GetDeliveryTag (), false); channel.basicack (deliver.getenvelope (). GetDeliveryTag (), false);}}/** * Каждая точка занимает 1s * @parar asktextextextexception */private void void void void (string structextexcepteex * private void void void voiderak (string stricexcepteex voidexcepteex * @ThrOhTexception. ch: task.thararray ()) {if (ch == '.') thread.sleep (1000);}}}Суммировать
Выше приведено подробное объяснение кода рабочей очереди Java в этой статье, я надеюсь, что это будет полезно для всех. Если есть какие -либо недостатки, пожалуйста, оставьте сообщение, чтобы указать это. Спасибо, друзья, за вашу поддержку на этом сайте!