Scène
Des tâches chronométrées sont souvent nécessaires en développement. Pour les centres commerciaux, les tâches chronométrées sont particulièrement nombreuses, telles que l'expiration chronométrée du coupon, la fermeture chronométrée de la commande, le paiement de WeChat pendant 2 heures sans payer pour fermer les commandes, etc., qui nécessitent tous des tâches chronométrées. Cependant, il y a un problème avec les tâches de synchronisation elles-mêmes. D'une manière générale, nous interrogeons la base de données par le biais de sondage chronométré pour déterminer s'il y a des tâches à exécuter. C'est-à-dire, quoi qu'il arrive, nous devons d'abord interroger la base de données. Certaines tâches ont des exigences élevées pour la précision du temps et doivent être interrogées une fois par seconde. Peu importe si le système est petit. Si le système lui-même est grand et qu'il existe de nombreuses données, ce n'est pas très réaliste, donc d'autres méthodes sont nécessaires. Bien sûr, il existe de nombreuses façons de les mettre en œuvre, telles que les files d'attente de synchronisation de la mise en œuvre de Redis, les files d'attente de retard JDK en fonction des files d'attente de priorité, des tours de temps, etc. Parce que nous utilisons RabbitMQ dans notre projet, en fonction du principe du développement et de la maintenance faciles, nous utilisons la file d'attente RabbitMQ pour mettre en œuvre des tâches de synchronisation. Si vous ne savez pas ce qu'est Rabbitmq ou comment Springboot intègre Rabbitmq, vous pouvez consulter mon précédent article Spring Boot Integrated Rabbitmq
Rabbitmq Delay Fitre
Rabbitmq lui-même n'a pas de file d'attente de retard et ne peut être mis en œuvre que par les caractéristiques de la file d'attente de Rabbitmq. Si RabbitMQ veut mettre en œuvre des files d'attente de retard, vous devez utiliser le commutateur de la lettre d'adaptation de RabbitMQ (échange) et le temps de survie des messages TTL (temps pour vivre)
Commutateur de lettre mort
Un message entrera un commutateur de lettre mort si les conditions suivantes sont remplies. N'oubliez pas qu'il s'agit d'un commutateur au lieu d'une file d'attente. Un commutateur peut correspondre à de nombreuses files d'attente.
L'interrupteur de lettres morts est un interrupteur ordinaire, mais parce que nous jetons des messages expirés, il est appelé un commutateur de lettre mort. Cela ne signifie pas que l'interrupteur de lettres morts est un commutateur spécifique.
Message TTL (temps de survie du message)
Le TTL du message est le temps de survie du message. RabbitMQ peut définir TTL pour les files d'attente et les messages séparément. Le paramètre de file d'attente signifie que la file d'attente n'a pas de temps de rétention connecté aux consommateurs, et vous pouvez également créer des paramètres distincts pour chaque message individuel. Après cette période, nous pensons que les nouvelles sont mortes, et cela s'appelle une lettre de mort. Si la file d'attente est définie et que le message est défini, le petit sera pris. Donc, si un message est acheminé vers une file d'attente différente, le moment de la mort de ce message peut être différent (différents paramètres de file d'attente). Ici, nous parlons de TTL d'un seul message, car c'est la clé de la mise en œuvre de tâches retardées.
BYTE [] MessageBodyBytes = "Hello, World!". GetBytes (); AMQP.BasicProperties Properties = new AMQP.BasicProperties (); Properties.SetExpiration ("60000"); Channel.BasicPublish ("My-Exchange", "Queue-Key", Properties, MessageBodyBytes);Vous pouvez définir l'heure en définissant le champ d'expiration du message ou de la propriété X-Message-TTL, qui ont tous deux le même effet. C'est juste que le champ d'expiration est un paramètre de chaîne, vous devez donc écrire une chaîne de type int: lorsque le message ci-dessus est jeté dans la file d'attente, 60 secondes passent, si elle n'est pas consommée, elle mourra. Ne sera pas consommé par les consommateurs. Les nouvelles derrière cette nouvelle ne sont pas "mortes" et sont consommées par les consommateurs. Les lettres mortes ne seront pas supprimées et libérées dans la file d'attente, elles seront comptées dans le nombre de messages dans la file d'attente.
Tableau de flux de processus
Créer des commutateurs et des files d'attente
Créer un commutateur de lettre mort
Comme le montre la figure, il s'agit de créer un commutateur ordinaire. Dans un souci de distinction facile, le nom du commutateur est de retard
Créer une file d'attente de messages d'expiration automatique
La fonction principale de cette file d'attente est de faire expirer les messages régulièrement. Par exemple, si nous devons fermer la commande en 2 heures, nous devons mettre le message dans cette file d'attente et définir le temps d'expiration du message à 2 heures
Créez une file d'attente expirée automatiquement nommée de retard_queue1. Bien sûr, les paramètres de l'image n'expireront pas automatiquement le message, car nous ne définissons pas le paramètre X-Message-TTL. Si les messages de toute la file d'attente sont les mêmes, vous pouvez le définir. Pour la flexibilité, il n'est pas défini. Les deux autres paramètres x-lead-letter-échange représentent le commutateur auquel le message entrera après l'expiration du message. La configuration ici est un délai, c'est-à-dire le commutateur de lettre mort. X-Dead-Letter-Routing-Key consiste à configurer la touche de routage du commutateur de lettre mort après l'expiration du message. Il en va de même pour l'envoi de la touche de routage du message. Selon cette clé, le message sera placé dans une autre file d'attente.
Créer une file d'attente de traitement des messages
Cette file d'attente est la file d'attente qui traite vraiment les messages, et tous les messages entrant dans cette file d'attente seront traités
Le nom de la file d'attente de messages est delay_queue2
La file d'attente de messages est liée à la commutation
Entrez la page Détails du commutateur et liez les deux files d'attente créées (Delay Queue1 et Delay Queue2) au commutateur.
La clé de routage de la file d'attente de messages d'expiration automatique est définie pour retarder
Lier la file d'attente de retard2
La clé de la file d'attente de retard doit être définie pour créer un paramètre de touche de la file d'attente automatique à expiration automatique, de façon automatique, de sorte que lorsque le message expire, le message peut être automatiquement placé dans la file d'attente de retard_queue2.
La page de gestion liée est comme indiqué sur la figure:
Bien sûr, cette liaison peut également être implémentée à l'aide du code, juste pour une expression intuitive, de sorte que la plate-forme de gestion utilisée dans cet article est utilisée pour fonctionner
Envoyer un message
String msg = "Hello Word"; MessageProperties MessageProperties = new MessageProperties (); MessageProperties.SetExpiration ("6000"); MessageProperties.SetCorrelationId (uUID.RandomuUid (). ToString (). GetBytes ()); Message Message = nouveau message (msg.getBytes (), MessageProperties); RabbitTemplate.ConvertandSend ("Delay", "Delay", Message);Le code principal est
MessageProperties.SetExpiration ("6000");Définissez le message pour expirer après 6 secondes
Remarque: Parce que le message doit être automatiquement expiré, vous ne devez pas définir l'écoute de Delay_queue1 et les messages de cette file d'attente ne peuvent pas être acceptés. Sinon, une fois le message consommé, il n'y aura pas d'expiration.
Recevoir un message
Configurez simplement Delay_queue2 pour écouter pour recevoir des messages
package wang.raye.rabbitmq.demo1; import org.springframework.amqp.core.ackNowledGemode; import org.springframework.amqp.core.binding; import org.springframework.amqp.core.binding; import org.springframework.amqp.core.bindingBuilder; import org.springframework.amqp.core.directExchange; import org.springframework.amqp.core.sessage; import org.springframework.amqp.core.queue; import org.springframework.amqp.rabbit.connection.cachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelawareMessageListener; import org.springframework.amqp.rabbit.Listener.SimplessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configurationpublic classe delayqueue {/ ** nom du commutateur de message * / public static final string échange = "rythme"; / ** file d'attente Key1 * / public static final string routingKey1 = "Delay"; / ** file d'attente Key2 * / public static final string routingKey2 = "Delay_key"; / ** * Informations de liaison de configuration * @return * / @Bean public ConnectionFactory ConnectionFactory () {CachingConnectionFactory ConnectionFactory = new CachingConnectionFactory ("120.76.237.8", 5672); ConnectionFactory.SetUserName ("Kberp"); ConnectionFactory.SetPassword ("Kberp"); ConnectionFactory.SetVirtualHost ("/"); ConnectionFactory.SetPublisherConFirms (true); // return ConnectionFactory doit être défini; } / ** * Configurer le commutateur de messages * Configurer Fanoutexchange pour les consommateurs: Distribuez des messages à toutes les files d'attente liées, sans le concept de RoutingKey HeadersExchange: Match DirectExChange en ajoutant l'attribut Key-Value: Distribuez à la file d'attente spécifiée selon RoutingKey TopicexChange: Multi-Key Matching * / @Bean Public DirectExchanged; } / ** * Configurer la file d'attente de messages 2 * Configurer * / @Bean Public Queue Queue () {return new Queue ("Delay_queue2", true); // cite persistant} / ** * lier la file d'attente de messages 2 avec le commutateur * Configurer pour les consommateurs * @return * / @bean @autowired public lising binding () {return binkingBuilder.bind (queue ()). To (DefthelExchange ()). Avec (delayQueue.RoutingKey2); } / ** * Acceptez l'auditeur du message, cet auditeur acceptera le message de la file d'attente de message 1 * Configurez pour les consommateurs * @return * / @bean @autowired public SimpleMessageListenCainer MessageContainer2 (ConnectionFactory ConnectionFactory) {SimpleMessageListenConainer Container = New SimpleMessageAlListendernener (ConnectionFactory); contener.setQueues (queue ()); contener.setExposeListEnerChannel (true); contener.setMaxConcurrentConsumers (1); contener.setConcurrentConsumers (1); contener.setackNowledGemode (remobledGemode.manual); // Définir le mode de confirmation Confirmer manuellement Container.SetMessageListener (new ChannelAwareMessageListener () {public void OnMessage (message de message, com.rabbitmq.client.channel canal) lève exception {byte [] body = message.getbody (); system.out.println ("delay_queue2 a reçu le message:" + new String (body); channel.basicack (message.getMessageProperties (). getDeliveryTag (), false); conteneur de retour; }}Gérez simplement les tâches qui doivent être traitées régulièrement lors de l'écoute des messages. Étant donné que RabbitMQ peut envoyer des messages, vous pouvez envoyer le code de fonctionnalité de la tâche, tel que la fermeture de la commande et l'envoi de l'ID de commande, ce qui évite la nécessité d'interroger les commandes qui doivent être fermées et d'augmenter la charge sur MySQL. Après tout, une fois que le volume de commande est grand, la requête elle-même est également une chose très coûteuse.
Résumer
La mise en œuvre des tâches de synchronisation basées sur RabbitMQ consiste à définir un temps d'expiration pour le message, à la mettre dans une file d'attente qui n'est pas lue, afin que le message soit automatiquement transféré dans une autre file d'attente après son expiration et surveiller l'auditeur de ce message de file d'attente pour gérer les opérations spécifiques des tâches de synchronisation.
Ce qui précède est tout le contenu de cet article. J'espère que cela sera utile à l'apprentissage de tous et j'espère que tout le monde soutiendra davantage Wulin.com.