We wrote about sending and receiving messages through a named queue. If you don't know yet, please click: Getting Started with RabbitMQ Java. In this article, we will create a work queue to distribute time-consuming tasks among consumers.
The main task of a work queue is to avoid immediately executing resource-intensive tasks and then have to wait for them to complete. Instead, we perform task scheduling: we encapsulate the task as a message and send it to the queue. The work is run in the background and constantly removes tasks from the queue and then executes them. When you run multiple worker processes, tasks in the task queue will be shared by the worker process.
Such a concept is extremely useful in web applications when complex tasks are required to be performed between very short HTTP requests.
1. Prepare
We use Thread.sleep to simulate time-consuming tasks. We add a certain number of points at the end of the message sent to the queue, each point means it takes 1 second in the worker thread, such as hello... will need to wait 3 seconds.
Sender:
NewTask.java
import java.io.IOException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class NewTask{//queue name private final static String QUEUE_NAME = "workqueue";public static void main(String[] args) throws IOException {//Create connection and channel ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//Declare the queue channel.queueDeclare(QUEUE_NAME, false, false, false, null);//Send 10 messages, append 1-10 points after the message in turn for (int i = 0; i < 10; i++) {String dots = "";for (int j = 0; j <= i; j++) {dots += ".";}String message = "helloworld" + dots+dots.length();channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}//Close channel and resource channel.close();connection.close();}}Receiver:
Work.java
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Work{//Quote name private final static String QUEUE_NAME = "workqueue";public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {//Distinguish between different worker processes int hashCode = Work.class.hashCode();//Create connection and channel ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//Declare the queue channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(hashCode + " [*] Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);// Specify the consumption queue channel.basicConsume(QUEUE_NAME, true, consumer); while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(hashCode + " [x] Received '" + message + "'");doWork(message);System.out.println(hashCode + " [x] Done");}}/** * Each point takes 1s * @param task * @throws InterruptedException */private static void doWork(String task) throws InterruptedException {for (char ch : task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}}} Round-robin forwarding
The advantage of using task queues is that they can work in parallel easily. If we have a lot of work backlog, we can solve the problem by adding more workers, making the system more scalable.
Next, we will first run 3 workers (Work.java) instances, and then run NewTask.java. The three workers instances will get information. But how to allocate? Let's look at the output result:
[x] Sent 'helloworld.1'[x] Sent 'helloworld..2'[x] Sent 'helloworld...3'[x] Sent 'helloworld......4'[x] Sent 'helloworld......5'[x] Sent 'helloworld......6'[x] Sent 'helloworld.......7'[x] Sent 'helloworld.......8'[x] Sent 'helloworld.........9'[x] Sent 'helloworld..........10' Worker 1:605645 [*] Waiting for messages. To exit press CTRL+C605645 [x] Received 'helloworld.1'605645 [x] Done605645 [x] Received 'helloworld....4'605645 [x] Done605645 [x] Received 'helloworld.........7'605645 [x] Done605645 [x] Received 'helloworld..........10'605645 [x] Done Worker 2: 18019860 [*] Waiting for messages. To exit press CTRL+C18019860 [x] Received 'helloworld..2'18019860 [x] Done18019860 [x] Received 'helloworld.....5'18019860 [x] Done18019860 [x] Received 'helloworld.........8'18019860 [x] Done Worker 3: 18019860 [*] Waiting for messages. To exit press CTRL+C18019860 [x] Received 'helloworld...3'18019860 [x] Done18019860 [x] Received 'helloworld......6'18019860 [x] Done18019860 [x] Received 'helloworld......9'18019860 [x] Done
As you can see, by default, RabbitMQ will send information to the next consumer one by one, regardless of the duration of each task, etc., and it is a one-time allocation, not one-by-one allocation. On average, each consumer will receive an equal amount of information. This way of distributing messages is called round-robin.
2. Messageacknowledgments
It takes several seconds to perform a task. You may be worried about interruptions when a worker performs a task. In our code above, once RabbItMQ delivers a message to the consumer, it will immediately remove this information from memory. In this case, if one of the workers who are performing the task is killed, we will lose the information it is processing. We will also lose messages that have been forwarded to this worker and it has not yet been executed.
In the example above, we first start two tasks, then execute the code that sends the task (NewTask.java), and then immediately close the second task, and the result is:
Worker 2: 31054905[*]Waitingformessages.ToexitpressCTRL+C 31054905[x]Received'helloworld..2' 31054905[x]Done 31054905[x]Received'helloworld....4' Worker 1: 18019860[*]Waitingformessages.ToexitpressCTRL+C 18019860[x]Received'helloworld.1' 18019860[x]Done 18019860[x]Received'helloworld...3' 18019860[x]Done 18019860[x]Received'helloworld.........5' 18019860[x]Done 18019860[x]Received'helloworld.........7' 18019860[x]Done 18019860[x]Received'helloworld.........9' 18019860[x]Done
It can be seen that the second worker lost at least tasks 6, 8, and 10, and task 4 was not completed.
However, we do not want to lose any tasks (information). When a worker (recipient) is killed, we want to pass the task to another worker.
To ensure that messages will never be lost, RabbitMQ supports message acknowledgments. The consumer sends a reply to RabbitMQ, telling it that the information has been received and processed, and then RabbitMQ can freely delete the information.
If the consumer is killed without sending a reply, RabbitMQ will assume that the information has not been processed completely and will be redirected to another consumer. In this way, you can confirm that the information is not lost, even if the consumer is occasionally killed.
This mechanism does not mean that timeout is not the case. RabbitMQ only re-reposts this information when the consumer connection is disconnected. It is allowed if the consumer takes a particularly long time to process a message.
Message reply is on by default. In the above code, we turn off this mechanism by setting autoAsk=true as shown. Let's modify the code (Work.java):
boolean ack = false ; //Open the response mechanism channel.basicConsume(QUEUE_NAME, ack, consumer); //In addition, you need to manually send a reply after each processing completes a message. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
Completely modified Work.java
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Work{//Quote name private final static String QUEUE_NAME = "workqueue";public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {//Distinguish between different worker processes int hashCode = Work.class.hashCode();//Create connection and channel ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//Declare the queue channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(hashCode + " [*] Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);// Specify the consumption queue Boolean ack = false ;// Open the response mechanism channel.basicConsume(QUEUE_NAME, ack, consumer); while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(hashCode + " [x] Received '" + message + "'");doWork(message);System.out.println(hashCode + " [x] Done");//Send a reply channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}test:
We change the number of messages to 5, then open two consumers (Work.java), then send a task (NewTask.java), immediately close one consumer, and observe the output:
[x]Sent'helloworld..2' [x]Sent'helloworld...3' [x]Sent'helloworld...4' [x]Sent'helloworld......5' Worker2 18019860[*]Waitingformessages.ToexitpressCTRL+C 18019860[x]Received'helloworld..2' 18019860[x]Done 18019860[x]Received'helloworld....4' Worker1 31054905[*]Waitingformessages.ToexitpressCTRL+C 31054905[x]Received'helloworld.1' 31054905[x]Done 31054905[x]Received'helloworld...3' 31054905[x]Done 31054905[x]Received'helloworld....5' 31054905[x]Done 31054905[x]Received'helloworld....4' 31054905[x]Done
You can see that the task 4 that did not complete by worker 2 is re-forwarded to worker 1 for completion.
3. Messaged durability
We have learned that even if consumers are killed, the message will not be lost. But if the RabbitMQ service is stopped at this time, our messages will still be lost.
When RabbitMQ exits or exits abnormally, all queues and information will be lost unless you tell it not to lose it. We need to do two things to ensure that the information is not lost: we need to set persistent flags for all queues and messages.
First, we need to confirm that RabbitMQ will never lose our queue. In order to do this, we need to declare it as persistent.
booleandurable=true;
channel.queueDeclare("task_queue",durable,false,false,null);
Note: RabbitMQ does not allow redefining a queue with different parameters, so we cannot modify the attributes of the queue that already exists.
Second, we need to identify our information as persistent. Set the MessageProperties (implementsBasicProperties) value to PERSISTENT_TEXT_PLAIN.
channel.basicPublish("","task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
Now you can execute a program that sends messages, then close the service, restart the service, and run the consumer program to do the experiment.
4. Fairdispatch
Perhaps we will find that the current message forwarding mechanism (Round-robin) is not what we want. For example, in this case, for two consumers, there is a series of tasks, odd tasks are particularly time-consuming, while even tasks are easy, which causes one consumer to be busy while the other consumer to quickly complete the task and wait after it finishes.
The reason for this is that RabbitMQ only forwards messages when the message arrives in the queue. Don't care how many tasks the consumer has not delivered a reply to RabbitMQ. Just blindly forward all odd numbers to one consumer and even numbers to another consumer.
To solve this problem, we can use the basicQos method, passing the parameter as prefetchCount=1. This tells RabbitMQ not to give more than one message to a consumer at the same time. In other words, the next message will be sent only when the consumer is idle.
int prefetchCount = 1;channel.basicQos(prefetchCount);
Note: If all workers are busy, your queue may be filled. You might observe queue usage and then add workers, or use some other strategy.
Test: Change the code to send a message, change the end number of points to 6-2, then start the two workers first, and then send the message:
[x] Sent 'helloworld......6'[x] Sent 'helloworld.....5'[x] Sent 'helloworld....4'[x] Sent 'helloworld...3'[x] Sent 'helloworld..2' Worker 1: 18019860 [*] Waiting for messages. To exit press CTRL+C18019860 [x] Received 'helloworld......6'18019860 [x] Done18019860 [x] Received 'helloworld...3'18019860 [x] Done Worker 2: 31054905 [*] Waiting for messages. To exit press CTRL+C31054905 [x] Received 'helloworld......5'31054905 [x] Done31054905 [x] Received 'helloworld......4'31054905 [x] Done
It can be seen that the message was not forwarded according to the previous Round-robin mechanism at this time, but was forwarded when the consumer was not busy. In addition, under this model, consumers are supported to increase dynamically because the message is not sent out, and the dynamic increase is increased immediately. The default forwarding mechanism will cause, even if the consumer is added dynamically, the message has been allocated and cannot be added immediately, even if there are many unfinished tasks.
5. Complete code
NewTask.java
import java.io.IOException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;public class NewTask{// Queue name private final static String QUEUE_NAME = "workqueue_persistence";public static void main(String[] args) throws IOException {// Create connection and channel ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// Declare the queue Boolean durable = true;// 1. Set queue persistence channel.queueDeclare(QUEUE_NAME, durable, false, false, null);// Send 10 messages, appending 1-10 points after the message in turn for (int i = 5; i > 0; i--) {String dots = "";for (int j = 0; j <= i; j++) {dots += ".";}String message = "helloworld" + dots + dots.length();// MessageProperties 2. Set message persistence channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}// Close channel and resource channel.close();connection.close();}}Work.java
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Work{// Queue name private final static String QUEUE_NAME = "workqueue_persistence";public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {// Differentiate the output of different worker processes int hashCode = Work.class.hashCode();// Create connection and channel ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// Declare the queue Boolean durable = true;channel.queueDeclare(QUEUE_NAME, durable, false, false, null);System.out.println(hashCode + " [*] Waiting for messages. To exit press CTRL+C");//Set the maximum number of forwarded messages for service int prefetchCount = 1;channel.basicQos(prefetchCount);QueueingConsumer consumer = new QueueingConsumer(channel);// Specify the consumption queue Boolean ack = false;// Turn on the response mechanism channel.basicConsume(QUEUE_NAME, ack, consumer); while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(hashCode + " [x] Received '" + message + "'");doWork(message);System.out.println(hashCode + " [x] Done");//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}/** * Each point takes 1s * * @param task * @throws InterruptedException */private static void doWork(String task) throws InterruptedException {for (char ch : task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}}}Summarize
The above is all the detailed explanation of Java work queue code in this article, I hope it will be helpful to everyone. If there are any shortcomings, please leave a message to point it out. Thank you friends for your support for this site!