Descripción general
Cuando estaba entrevistando en NetEase, el entrevistador me hizo una pregunta y dijo
Después de realizar un pedido, si el usuario no paga y necesita cancelar el pedido, ¿qué puedo hacer?
Mi respuesta en ese momento era usar una tarea cronometrada para escanear la tabla DB. El entrevistador no estaba muy satisfecho y preguntó:
¿Hay alguna otra forma de usar tareas cronometradas para lograr notificaciones precisas en tiempo real?
Mi respuesta en ese momento fue:
Puedes usar una cola. Después de realizar el pedido, se envía un mensaje a la cola y se especifica el tiempo de vencimiento. Una vez que llega el momento, se ejecuta la interfaz de devolución de llamada.
Después de que el entrevistador escuchó, dejó de preguntar. En realidad, mi idea era correcta en ese momento, pero no era muy profesional. El dicho profesional es usar mensajes retrasados.
De hecho, de hecho, hay algún problema con el uso de tareas cronometradas. El sistema comercial original espera que si el pedido no se paga en 10 minutos, el pedido se cancelará de inmediato y se lanzará el inventario del producto. Sin embargo, una vez que el volumen de datos sea grande, se extenderá el tiempo para obtener datos de pedidos no remunerados. Algunos pedidos se cancelarán después de 10 minutos, que pueden ser de 15 minutos, 20 minutos, etc. De esta manera, el inventario no se lanzará a tiempo y también afectará el número impar. Usando mensajes de retraso, la operación de cancelación de pedidos se puede realizar teóricamente de acuerdo con el tiempo establecido.
Actualmente, la mayoría de los artículos en Internet sobre el uso de RabbitMQ para implementar mensajes retrasados son sobre cómo usar la cola de letras muertas de RabbitMQ para implementar. La solución de implementación parece muy complicada y se implementa utilizando la API del cliente RabbitMQ original, que es aún más detallada.
Spring Boot ha envuelto la API del cliente RabbitMQ, que es mucho más simple de usar. Aquí hay una introducción detallada sobre cómo usar el complemento RabbitMQ_Delayed_Message_Exchange y Spring Boot para implementar mensajes retrasados.
Preparación de software
Erlang
La versión utilizada en este artículo es: Erlang 20.3
Conejo
Este artículo utiliza la versión de la ventana de RabbitMQ, el número de versión es: 3.7.4
complemento de rabbitmq_delayed_message_exchange
Dirección de descarga del complemento: http://www.rabbitmq.com/community-plugins.html
Después de abrir la URL, CTRL + F y Buscar RabbitMQ_Delayed_Message_Exchange.
Recuerde, debe elegir el número de versión. Como estoy usando RabbitMQ 3.7.4, el complemento RabbitMQ_Delayed_Message_Exchange correspondiente también debe elegir 3.7.x.
Si no selecciona la versión correcta, encontrará varios problemas extraños cuando use mensajes retrasados, y no hay solución en Internet. Luché toda la noche debido a este problema. Recuerde seleccionar la versión de complemento correcta.
Después de descargar el complemento, colóquelo en el directorio de complementos en el directorio de instalación de RabbitMQ e inicie el complemento utilizando el siguiente comando:
rabbitmq-plugins habilita rabbitmq_delayed_message_exchange
Si el inicio es exitoso, aparecerá el siguiente mensaje:
Se han habilitado los siguientes complementos: rabbitmq_delayed_message_exchange
Después de que el complemento se inicie con éxito, recuerde reiniciar RabbitMQ para que surja con efecto.
Rabbitmq integrado
Esto es muy simple, solo agrégalo en el archivo pom.xml del proyecto Maven
<Spendency> <MoupRupid> org.springframework.boot </groupid> <artifactID> spring-boot-starter-amqp </artifactid> </pendency>
Estoy usando 2.0.1. Libre para el arranque de primavera.
A continuación, agregue la configuración de Redis en el archivo Application.Properties:
spring.rabbitmq.host = 127.0.0.1spring.rabbitmq.port = 5672spring.rabbitmq.username = invitados
Definir ConnectionFactory y Rabbittsplate
También es muy simple, el código es el siguiente:
paquete com.mq.rabbitmq; import org.springframework.amqp.rabbit.connection.cachingconnectionFactory; import org.springframework.amqp.rabbit.connection.connectionFactory; import og.springfframework.amqp.rabbit.rabbittmplate; import; org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration;@configuration@configurationPreperties (prefix = "spring.rabbitmq") Class de clase pública RABBITURATION;@Configuration@ConfigurationPreperties (prefix = "Spring.rabbitmq") Puerto privado int; nombre de usuario de cadena privada; contraseña de cadena privada; @Bean Public ConnectionFactory ConnectionFactory () {CachingConnectionFactory CachingConnectionFactory = new CachingConnectionFactory (host, puerto); CachingConnectionFactory.setUsername (nombre de usuario); CachingConnectionFactory.setPassword (contraseña); CachingConnectionFactory.SetVirtualHost ("/"); CachingConnectionFactory.setPublisherConfirms (verdadero); return CachingConnectionFactory; } @Bean public rabbittemplate rabbittemplate () {rabbittemplate rabbittemplate = new rabbittemplate (conexiónFactory ()); devolver rabbittemplate; } public String gethost () {return host; } public void sethost (host de cadena) {this.host = host; } public int getPort () {return Port; } public void setport (int puerto) {this.port = puerto; } public String getUsername () {return UserName; } public void setUsername (String UserName) {this.Username = username; } public String getPassword () {return Password; } public void setPassword (String Password) {this.password = contraseña; }}Configuración de intercambio y cola
paquete com.mq.rabbitmq; import org.springframework.amqp.core.*; import org.springframework.context.annotation.bean; importar org.springframework.context.annotation.configuration; import java.hashmap; import java.util.map; @configuration; {@Bean public customExchange DeletExchange () {MAP <String, Object> args = new HashMap <> (); args.put ("X-Delayed-Type", "Direct"); devolver nuevo customExchange ("test_exchange", "x-delayed-message", verdadero, falso, args); } @Bean public Queue Queue () {Queue Queue = new Queue ("test_queue_1", true); cola de devolución; } @Bean public Binding Binding () {return bindingBuilder.bind (queue ()). To (demoraxchange ()). Con ("test_queue_1"). Noargs (); }}Cabe señalar aquí que se usa CustomExchange, no DirectExchange, y el tipo de customExchange debe ser un mensaje de retraso X.
Implementar el envío de mensajes
paquete com.mq.rabbitmq; import org.springframework.amqp.amqpexception; import org.springframework.amqp.core.message; import og.springframework.amqp.core.messagePostProcessor; import org.springframework.beans.factory.annotation.aUtowired; import org.springframework.stereotype.service; import java.text.simpleDateFormat; import java.util.date; @ServicePublic Message MessageServicePeMervicePlapplate; public void sendmsg (string queueName, string msg) {SimpleDateFormat sdf = new SimpleDateFormat ("yyyyy-mm-dd hh: mm: ss"); System.out.println ("Mensaje Enviar tiempo:"+SDF.Format (new Date ())); rabbittemplate.convertAndSend ("test_exchange", queueName, msg, new MessagePostProcessor () {@Override public Message PostProcessMessage (Mensaje Mensaje) lanza AMQPException {Message.getMessageProperties (). Setheader ("X-DeLay", 3000); Mensaje de retorno;}}); }}Tenga en cuenta que al enviar, se debe agregar un encabezado
X-Delay
El tiempo de retraso que establecí aquí es de 3 segundos.
Consumidores de mensajes
paquete com.mq.rabbitmq; import org.springframework.amqp.rabbit.annotation.rabbithandler; import org.springframework.amqp.rabbit.annotation.rabbitListener; import og.springFrameWork.stereType.comPonent; import.text.simpledate java.util.date; @ComponentPublic Class MessagerEceiver {@RabitListener (queues = "test_queue_1") public void recibe (string msg) {simpledateFormat sdf = new SimpleDateFormat ("yyyyy-mm-dd hh: mm: ss"); System.out.println ("Tiempo de recepción de mensajes:"+sdf.format (new Date ())); System.out.println ("Mensaje recibido:"+msg); }}Ejecutar el programa Spring Boot y enviar mensajes
Ejecute el programa Spring Boot directamente en el método principal, y Spring Boot analizará automáticamente la clase MessagerEceiver.
A continuación, use Junit para ejecutar la interfaz que envía el mensaje.
paquete com.mq.rabbitmq; import org.junit.test; import org.junit.runner.runwith; import org.springframework.beans.factory.annotation.aUtowired; import org.springframework.boot.test.context.springboottest; import; org.springframework.test.context.junit4.springrunner; @runwith (springrunner.class) @springboottestpublic class rabbitmqapplicationTests {@aUtowired MessageServiceIm MessagesPl Oferta @Test public void send () {MessageService.sendmsg ("test_queue_1", "Hola, soy demora msg"); }} Después de ejecutar, puede ver la siguiente información:
Tiempo de envío de mensajes: 2018-05-03 12:44:53
Después de 3 segundos, la consola de arranque de primavera saldrá:
Hora de recepción de mensajes: 2018-05-03 12:44:56
Mensaje recibido: Hola, soy un msg de retraso
Lo anterior es todo el contenido de este artículo. Espero que sea útil para el aprendizaje de todos y espero que todos apoyen más a Wulin.com.