Scene
Timed tasks are often needed in development. For malls, timed tasks are particularly numerous, such as coupon timed expiration, order timed closing, WeChat payment for 2 hours without paying to close orders, etc., all of which require timed tasks. However, there is a problem with the timing tasks themselves. Generally speaking, we query the database through timed polling to determine whether there are tasks to be executed. That is to say, no matter what, we need to query the database first. Some tasks have high requirements for time accuracy and need to query once a second. It doesn’t matter if the system is small. If the system itself is large and there are many data, this is not very realistic, so other methods are needed. Of course, there are many ways to implement them, such as Redis implementation timing queues, JDK delay queues based on priority queues, time rounds, etc. Because we use Rabbitmq in our project, based on the principle of easy development and maintenance, we use Rabbitmq delay queue to implement timing tasks. If you don’t know what rabbitmq is or how springboot integrates Rabbitmq, you can check my previous article Spring boot integrated RabbitMQ
Rabbitmq delay queue
Rabbitmq itself has no delay queue, and can only be implemented through the characteristics of Rabbitmq's queue. If Rabbitmq wants to implement delay queues, you need to use Rabbitmq's dead-letter switch (Exchange) and message survival time TTL (Time To Live)
Dead letter switch
A message will enter a dead letter switch if the following conditions are met. Remember that this is a switch instead of a queue. A switch can correspond to many queues.
The dead letter switch is an ordinary switch, but because we throw out expired messages, it is called a dead letter switch. It does not mean that the dead letter switch is a specific switch.
Message TTL (Message Survival Time)
The TTL of the message is the survival time of the message. RabbitMQ can set TTL for queues and messages separately. The queue setting means that the queue has no retention time connected to consumers, and you can also make separate settings for each individual message. After this time, we think that the news is dead, and it is called a letter of death. If the queue is set and the message is set, then the small one will be taken. So if a message is routed to a different queue, the time of death of this message may be different (different queue settings). Here we talk about TTL of a single message, because it is the key to implementing delayed tasks.
byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000"); channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);You can set the time by setting the expiration field of the message or the x-message-ttl property, both of which have the same effect. It's just that the expiration field is a string parameter, so you need to write an int-type string: When the above message is thrown into the queue, 60 seconds passed, if it is not consumed, it will die. Will not be consumed by consumers. The news behind this news is not "dead" and is consumed by consumers. Dead letters will not be deleted and released in the queue, they will be counted into the number of messages in the queue.
Process flow chart
Create switches and queues
Create a dead letter switch
As shown in the figure, it is to create an ordinary switch. For the sake of easy distinction, the name of the switch is delay
Create an automatic expiration message queue
The main function of this queue is to make messages expire regularly. For example, if we need to close the order in 2 hours, we need to put the message into this queue and set the message expiration time to 2 hours
Create an automatically expired queue named delay_queue1. Of course, the parameters in the picture will not automatically expire the message, because we do not set the x-message-ttl parameter. If the messages in the entire queue are the same, you can set it. For flexibility, it is not set. The other two parameters x-dead-letter-exchange represent the switch to which the message will enter after the message expires. The configuration here is delay, that is, the dead letter switch. x-dead-letter-routing-key is to configure the routing-key of the dead letter switch after the message expires. The same is true for sending the routing-key of the message. According to this key, the message will be placed in a different queue.
Create a message processing queue
This queue is the queue that truly processes messages, and all messages entering this queue will be processed
The name of the message queue is delay_queue2
Message queue bound to switch
Enter the switch details page and bind the created two queues (delay queue1 and delay queue2) to the switch.
The routing key of the automatic expiration message queue is set to delay
Bind delay queue2
The key of delay queue2 should be set to create an automatically expired queue x-dead-letter-routing-key parameter, so that when the message expires, the message can be automatically placed in the delay_queue2 queue.
The bound management page is as shown in the figure:
Of course, this binding can also be implemented using code, just for intuitive expression, so the management platform used in this article is used to operate
Send a message
String msg = "hello word"; MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("6000"); messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes()); Message message = new Message(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("delay", "delay", message);The main code is
messageProperties.setExpiration("6000");Set the message to expire after 6 seconds
Note: Because the message is to be automatically expired, you must not set the listening of delay_queue1, and the messages in this queue cannot be accepted. Otherwise, once the message is consumed, there will be no expiration.
Receive message
Just configure delay_queue2 to listen to receive messages
package wang.raye.rabbitmq.demo1;import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configurationpublic class DelayQueue { /** Name of the message switch*/ public static final String EXCHANGE = "delay"; /** Queue key1*/ public static final String ROUTINGKEY1 = "delay"; /** Queue key2*/ public static final String ROUTINGKEY2 = "delay_key"; /** * Configuration link information* @return */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672); connectionFactory.setUsername("kberp"); connectionFactory.setPassword("kberp"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); // Return connectionFactory must be set; } /** * Configure message switch* Configure FanoutExchange for consumers: Distribute messages to all bound queues, without the concept of routingkey HeadersExchange: match DirectExchange by adding attribute key-value: Distribute to the specified queue according to routingkey TopicExchange: Multi-key matching */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE, true, false); } /** * Configure message queue 2 * Configure */ @Bean public Queue queue() { return new Queue("delay_queue2", true); //Quote persistent} /** * Bind message queue 2 with the switch* Configure for consumers* @return */ @Bean @Autowired public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2); } /** * Accept the listener of the message, this listener will accept the message from message queue 1* Configure for consumers* @return */ @Bean @Autowired public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //Set confirmation mode manually confirm container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("delay_queue2 Received the message: " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //Confirm the message to be consumed successfully} }); return container; } }Just handle tasks that need to be processed regularly during message listening. Because Rabbitmq can send messages, you can send the task feature code, such as closing the order and sending the order id, which avoids the need to query those orders that need to be closed and increasing the burden on MySQL. After all, once the order volume is large, query itself is also a very expensive thing.
Summarize
Implementing timing tasks based on Rabbitmq is to set an expiration time for the message, put it into a queue that is not read, so that the message will automatically be transferred to another queue after it expires, and monitor the listener of this queue message to handle the specific operations of the timing tasks.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.