Overview
When I was interviewing at NetEase, the interviewer asked me a question and said
After placing an order, if the user does not pay and needs to cancel the order, what can I do?
My answer at the time was to use a timed task to scan the DB table. The interviewer was not very satisfied and asked:
Is there any other way to use timed tasks to achieve accurate real-time notifications?
My answer at that time was:
You can use a queue. After the order is placed, a message is sent to the queue and the expiration time is specified. Once the time arrives, the callback interface is executed.
After the interviewer listened, he stopped asking. Actually, my idea was right at that time, but I was not very professional. The professional saying is to use delayed messages.
In fact, there is indeed some problem with using timed tasks. The original business system hopes that if the order is not paid in 10 minutes, the order will be cancelled immediately and the product inventory will be released. However, once the data volume is large, the time to obtain unpaid order data will be extended. Some orders will be cancelled after 10 minutes, which may be 15 minutes, 20 minutes, etc. In this way, the inventory will not be released in time and will also affect the odd number. Using delay messages, the order cancellation operation can theoretically be performed according to the set time.
Currently, most of the articles on the Internet about using RabbitMQ to implement delayed messages are about how to use RabbitMQ's dead letter queue to implement. The implementation solution looks very complicated and is implemented using the original RabbitMQ Client API, which is even more verbose.
Spring Boot has wrapped the RabbitMQ Client API, which is much simpler to use. Here is a detailed introduction to how to use the rabbitmq_delayed_message_exchange plug-in and Spring Boot to implement delayed messages.
Software preparation
erlang
The version used in this article is: Erlang 20.3
RabbitMQ
This article uses the window version of RabbitMQ, the version number is: 3.7.4
rabbitmq_delayed_message_exchange plugin
Plugin download address: http://www.rabbitmq.com/community-plugins.html
After opening the URL, ctrl + f and search rabbitmq_delayed_message_exchange.
Remember, you must choose the version number. Since I am using RabbitMQ 3.7.4, the corresponding rabbitmq_delayed_message_exchange plug-in must also choose 3.7.x.
If you do not select the right version, you will encounter various strange problems when using delayed messages, and there is no solution on the Internet. I struggled all night because of this problem. Please remember to select the right plug-in version.
After downloading the plugin, place it in the plugins directory under the RabbitMQ installation directory, and start the plugin using the following command:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
If the startup is successful, the following message will appear:
The following plugins have been enabled: rabbitmq_delayed_message_exchange
After the plug-in is successfully launched, remember to restart RabbitMQ to make it take effect.
Integrated RabbitMQ
This is very simple, just add it in the pom.xml file of the maven project
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
I'm using 2.0.1.RELEASE for Spring Boot.
Next, add the redis configuration in the application.properties file:
spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest
Define ConnectionFactory and RabbitTemplate
It's also very simple, the code is as follows:
package com.mq.rabbitmq;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration@ConfigurationProperties(prefix = "spring.rabbitmq")public class RabbitMqConfig { private String host; private int port; private String userName; private String password; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port); cachingConnectionFactory.setUsername(userName); cachingConnectionFactory.setPassword(password); cachingConnectionFactory.setVirtualHost("/"); cachingConnectionFactory.setPublisherConfirms(true); return cachingConnectionFactory; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); return rabbitTemplate; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; }}Exchange and Queue configuration
package com.mq.rabbitmq;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class QueueConfig { @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("test_exchange", "x-delayed-message",true, false,args); } @Bean public Queue queue() { Queue queue = new Queue("test_queue_1", true); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs(); }}It should be noted here that CustomExchange is used, not DirectExchange, and the type of CustomExchange must be x-delayed-message.
Implement message sending
package com.mq.rabbitmq;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.text.SimpleDateFormat;import java.util.Date;@Servicepublic class MessageServiceImpl { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String queueName,String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("Message send time:"+sdf.format(new Date())); rabbitTemplate.convertAndSend("test_exchange", queueName, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("x-delay",3000); return message; } }); }}Note that when sending, a header must be added
x-delay
The delay time I set here is 3 seconds.
Message Consumers
package com.mq.rabbitmq;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;import java.util.Date;@Componentpublic class MessageReceiver { @RabbitListener(queues = "test_queue_1") public void receive(String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("Message reception time:"+sdf.format(new Date())); System.out.println("Received message:"+msg); }}Run Spring Boot program and send messages
Run the Spring Boot program directly in the main method, and Spring Boot will automatically parse the MessageReceiver class.
Next, just use Junit to run the interface that sends the message.
package com.mq.rabbitmq;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTestpublic class RabbitmqApplicationTests { @Autowired private MessageServiceImpl messageService; @Test public void send() { messageService.sendMsg("test_queue_1","hello i am delay msg"); }} After running, you can see the following information:
Message sending time: 2018-05-03 12:44:53
After 3 seconds, the Spring Boot console will output:
Message reception time: 2018-05-03 12:44:56
Received message: hello i am delay msg
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.