Escena
Las tareas cronometradas a menudo se necesitan en el desarrollo. Para los centros comerciales, las tareas cronometradas son particularmente numerosas, como la expiración cronometrada de cupones, el cierre de la orden, el cierre de la transmisión, el pago de WeChat durante 2 horas sin pagar para cerrar pedidos, etc., todos los cuales requieren tareas cronometradas. Sin embargo, hay un problema con las tareas de tiempo en sí. En términos generales, consultamos la base de datos a través de la encuesta cronometrada para determinar si hay tareas que se ejecutarán. Es decir, pase lo que pase, primero debemos consultar la base de datos. Algunas tareas tienen altos requisitos para la precisión del tiempo y necesitan consultar una vez por segundo. No importa si el sistema es pequeño. Si el sistema en sí es grande y hay muchos datos, esto no es muy realista, por lo que se necesitan otros métodos. Por supuesto, hay muchas maneras de implementarlos, como las colas de tiempo de implementación de Redis, las colas de retraso de JDK basadas en colas de prioridad, rondas de tiempo, etc. Debido a que usamos RabbitMQ en nuestro proyecto, basado en el principio de desarrollo y mantenimiento fácil, utilizamos la cola de retraso de RabbitMQ para implementar tareas de tiempo. Si no sabe qué es RabbitMQ o cómo Springboot integra RabbitMQ, puede consultar mi artículo anterior Boot Spring Boot Integrated RabbitMQ
Cola de retraso de conejos
RabbitMQ en sí no tiene una cola de retraso, y solo se puede implementar a través de las características de la cola de RabbitMQ. Si RabbitMQ quiere implementar las colas de retraso, debe usar el interruptor de letra muerta (intercambio) de RabbitMQ y el tiempo de supervivencia de mensajes TTL (Tiempo para vivir)
Interruptor de letras muertas
Un mensaje ingresará un interruptor de letra muerta si se cumplen las siguientes condiciones. Recuerde que este es un interruptor en lugar de una cola. Un interruptor puede corresponder a muchas colas.
El interruptor de la letra muerta es un interruptor ordinario, pero debido a que tiramos mensajes caducados, se llama un interruptor de letra muerta. No significa que el interruptor de letras muertas sea un interruptor específico.
Mensaje TTL (Tiempo de supervivencia del mensaje)
El TTL del mensaje es el tiempo de supervivencia del mensaje. RabbitMQ puede establecer TTL para colas y mensajes por separado. La configuración de la cola significa que la cola no tiene tiempo de retención conectado a los consumidores, y también puede hacer una configuración separada para cada mensaje individual. Después de este tiempo, creemos que la noticia está muerta, y se llama una carta de muerte. Si la cola está configurada y el mensaje está configurado, se tomará el pequeño. Entonces, si un mensaje se enruta a una cola diferente, el tiempo de muerte de este mensaje puede ser diferente (configuración de cola diferente). Aquí hablamos de TTL de un solo mensaje, porque es la clave para implementar tareas retrasadas.
byte [] messageBodyBytes = "¡Hola, mundo!". GetBytes (); Amqp.basicProperties Properties = new Amqp.BasicProperties (); Properties.setExpiration ("60000"); Channel.BasicPublish ("My-Exchange", "Queue-Key", Propiedades, MessageBodyBytes);Puede establecer el tiempo estableciendo el campo de vencimiento del mensaje o la propiedad X-Message-TTL, los cuales tienen el mismo efecto. Es solo que el campo de vencimiento es un parámetro de cadena, por lo que debe escribir una cadena de tipo int: cuando el mensaje anterior se arroje a la cola, pasan 60 segundos, si no se consume, morirá. No será consumido por los consumidores. La noticia detrás de esta noticia no está "muerta" y es consumida por los consumidores. Las letras muertas no serán eliminadas y liberadas en la cola, se contarán en el número de mensajes en la cola.
Gráfico de flujo de proceso
Crear interruptores y colas
Crear un interruptor de letra muerta
Como se muestra en la figura, es crear un interruptor ordinario. En aras de la distinción fácil, el nombre del interruptor es el retraso
Crear una cola de mensajes de vencimiento automático
La función principal de esta cola es hacer que los mensajes caducen regularmente. Por ejemplo, si necesitamos cerrar el pedido en 2 horas, debemos poner el mensaje en esta cola y establecer el tiempo de vencimiento del mensaje en 2 horas
Cree una cola expirada automáticamente llamada Delay_queue1. Por supuesto, los parámetros en la imagen no expirarán automáticamente el mensaje, porque no establecemos el parámetro X-Message-TTL. Si los mensajes en toda la cola son los mismos, puede configurarlos. Para flexibilidad, no está configurado. Los otros dos parámetros X-Dead-Letter-Exchange representan el conmutador al que ingresará el mensaje después de que expire el mensaje. La configuración aquí es el retraso, es decir, el interruptor de letra muerta. X-Dead-Letter-Ruting-Key es configurar la tecla de enrutamiento del interruptor de letra muerta después de que expire el mensaje. Lo mismo es cierto para enviar la tecla de enrutamiento del mensaje. Según esta clave, el mensaje se colocará en una cola diferente.
Crear una cola de procesamiento de mensajes
Esta cola es la cola que realmente procesa los mensajes, y todos los mensajes que ingresan a esta cola se procesarán
El nombre de la cola de mensajes es Delay_queue2
Cola de mensajes vinculada a cambiar
Ingrese la página Detalles del interruptor y vincule las dos colas creadas (retraso en cola1 y retraso quee2) al conmutador.
La clave de enrutamiento de la cola de mensajes de vencimiento automático se establece para retrasar
Retraso de retraso de enlace2
La clave de la cola de retraso2 debe configurarse para crear un parámetro de tecla de letrero de la letra X-freead-letrero de la cola expirada expirada automáticamente, de modo que cuando el mensaje expire, el mensaje se puede colocar automáticamente en la cola de retraso_queue2.
La página de administración vinculada es como se muestra en la figura:
Por supuesto, este enlace también se puede implementar utilizando el código, solo para la expresión intuitiva, por lo que la plataforma de administración utilizada en este artículo se utiliza para operar
Enviar un mensaje
Cadena msg = "Hello Word"; MessageProperties MessageProperties = new MessageProperties (); MessageProperties.SetExpiration ("6000"); MessageProperties.SetCorrelationId (uuid.randomuuid (). toString (). getBytes ()); Mensaje mensaje = nuevo mensaje (msg.getBytes (), messageProperties); rabbittemplate.convertandsend ("retraso", "retraso", mensaje);El código principal es
MessageProperties.SetExpiration ("6000");Establecer el mensaje para expirar después de 6 segundos
Nota: Debido a que el mensaje debe expirarse automáticamente, no debe establecer la escucha de Delay_queue1, y los mensajes en esta cola no se pueden aceptar. De lo contrario, una vez que se consuma el mensaje, no habrá vencimiento.
Mensaje de recibir
Simplemente configure el retraso_queue2 para escuchar los mensajes de recibir
paquete wang.raye.rabbitmq.demo1; import org.springframework.amqp.core.acknowledgemode; importar org.springframework.amqp.core.binding; importar org.springframework.amqp.core.binding; importar org.springframework.amqp.core.bindingbuilder; importar org.springframework.amqp.core.directexchange; importar org.springframework.amqp.core.message; importar org.springframework.amqp.core.queue; importar org.springframework.amqp.rabbit.connection.cachingConnectionFactory; importar org.springframework.amqp.rabbit.connection.connectionFactory; importar org.springframework.amqp.rabbit.core.channelawaremessageListener; importar org.springframework.amqp.rabbit.listener.simplemessageListenerContainer; importar org.springframework.beans.factory.annotation.aUtowired; importar org.springframework.context.annotation.bean; importar org.springframework.context.annotation.configuration; @ConfigurationPublic Class Delayqueue { / ** Nombre del conmutador de mensaje* / public static final String Exchange = "Delay"; / ** Key1 de cola**/ public static final String RoutingKey1 = "Retraso"; / ** Key2 de cola*/ public static final String RoutingKey2 = "Delay_key"; / *** Información del enlace de configuración* @return*/ @bean public una conexiónFactory ConnectionFactory () {CachingConnectionFactory ConnectionFactory = new CachingConnectionFactory ("120.76.237.8", 5672); ConnectionFactory.SetUsername ("Kberp"); ConnectionFactory.setPassword ("kberp"); ConnectionFactory.SetVirtualHost ("/"); ConnectionFactory.setPublisherConFirms (verdadero); // debe establecerse ConnectionFactory; } / *** Configurar el interruptor de mensaje* Configurar FanOutExchange para los consumidores: distribuya mensajes a todas las colas vinculadas, sin el concepto de enrutamiento de llave de llave, Cambio de encabezado: Match DirectExchange agregando atributo clave de atributo-Value: distribuir a la cola especificada según } / ** * Configurar la cola de mensajes 2 * Configurar * / @Bean Public Queue Queue () {return New Queue ("Delay_queue2", true); // CITA Persistente}/*** Bind Message Queue 2 con el Switch* Configurar para los consumidores* @return*/@Bean @AUTOWIREDED Public Binding Binding () {return bindingBuilder.bind (queue ()). To (defaultExchange ()). Con (demandar.RoutingKey2); } / *** Acepte al oyente del mensaje, este oyente aceptará el mensaje de la cola de mensajes 1* Configurar para los consumidores* @return* / @bean @aUtowired public simpleMessageListenercontainer MessageContainer2 (ConnectionFactoryFactory) {SimpleSesSageListEnperContainer Contenedor = new SimpleSessAsteReNINERER (ConexyFactory ();););););););)););));) contenedor.setqueues (queue ()); Container.SetExPosElistenerChannel (verdadero); Container.SetMaxConcurrentConsumers (1); contenedor.setConcurrentConsumers (1); Container.SetAcknowledgemode (reconocer elGemode.Manual); // Establecer el modo de confirmación Confirmar manualmente contenedor.setMessageListener (New ChannelAwarEmessAgelistener () {public void onMessage (mensaje de mensaje, com.rabbitmq.client.channel canal) lanza excepción {byte [] body = message.getBody (); system.out.println ("retrasar_queue 2 recibió el mensaje:" + nueva cadena (cuerpo (cuerpo)); Channel.Basicack (Message.getMessageProperties (). GetDeliveryTag (), falso); contenedor de devolución; }}Simplemente maneje las tareas que deben procesarse regularmente durante la escucha de mensajes. Debido a que RabbitMQ puede enviar mensajes, puede enviar el código de característica de la tarea, como cerrar el pedido y enviar el ID de pedido, lo que evita la necesidad de consultar aquellos pedidos que deben cerrarse y aumentar la carga en MySQL. Después de todo, una vez que el volumen del pedido es grande, la consulta en sí también es algo muy costoso.
Resumir
Implementar tareas de tiempo basadas en RabbitMQ es establecer un tiempo de vencimiento para el mensaje, ponerlo en una cola que no se lea, de modo que el mensaje se transfiera automáticamente a otra cola después de que expire, y monitoree el oyente de este mensaje de cola para manejar las operaciones específicas de las tareas de sincronización.
Lo anterior es todo el contenido de este artículo. Espero que sea útil para el aprendizaje de todos y espero que todos apoyen más a Wulin.com.