Сцена
Временные задачи часто нужны в разработке. Для торговых центров особенно многочисленные задачи, такие как истечение срока действия купонов, закрытие приказа, выплата WeChat в течение 2 часов без уплаты закрытия заказов и т. Д., Все из которых требуют временных задач. Тем не менее, есть проблема с самими задачами времени. Вообще говоря, мы запрашиваем базу данных посредством времени, чтобы определить, есть ли задачи, которые должны быть выполнены. То есть, несмотря ни на что, нам нужно сначала запросить базу данных. Некоторые задачи имеют высокие требования к точности времени и необходимо запросить один раз за секунду. Неважно, небольшая система. Если сама система большая и есть много данных, это не очень реалистично, поэтому необходимы другие методы. Конечно, существует много способов их реализации, таких как очередь времени реализации REDIS, очереди задержки JDK, основанные на очереди приоритетных, временных раундах и т. Д. Поскольку мы используем RabbitMQ в нашем проекте, на основе принципа легкой разработки и технического обслуживания, мы используем очередь задержки RabbitMQ для выполнения задач времена. Если вы не знаете, что такое Rabbitmq или как Springboot интегрирует Rabbitmq, вы можете проверить мою предыдущую статью Spring Boot Integrated Rabbitmq
Rabbitmq queue
Сам RabbitMQ не имеет очереди задержки и может быть реализован только с помощью характеристик очереди RabbitMQ. Если Rabbitmq хочет реализовать очереди задержки, вам необходимо использовать мертвый платежный коммутатор Rabbitmq (Exchange) и время выживания сообщений TTL (время для жизни)
Переключатель мертвого письма
Сообщение введет переключатель мертвой буквы, если будут выполнены следующие условия. Помните, что это переключатель вместо очереди. Переключатель может соответствовать многим очереди.
Переключатель мертвой буквы - это обычный переключатель, но поскольку мы выбрасываем истекшие сообщения, он называется переключателем мертвой буквы. Это не означает, что переключатель мертвой буквы - это конкретный переключатель.
Сообщение TTL (время выживания сообщения)
TTL сообщения - время выживания сообщения. RabbitMQ может установить TTL для очередей и сообщений отдельно. Настройка очереди означает, что в очереди нет времени удержания, подключенного к потребителям, и вы также можете создавать отдельные настройки для каждого отдельного сообщения. После этого мы думаем, что новости мертвы, и это называется смертью. Если очередь установлена и будет установлено сообщение, то маленький будет взят. Таким образом, если сообщение направляется в другую очередь, время смерти этого сообщения может быть другим (разные настройки очереди). Здесь мы говорим о TTL одного сообщения, потому что это ключ к реализации отсроченных задач.
byte [] messagebodybytes = "Привет, мир!". Getbytes (); Amqp.basicproperties Properties = new Amqp.BasicProperties (); Properties.SetExpiration ("60000"); channel.basicpublish ("my-exchange", "queue-key", свойства, сообщения boybodytes);Вы можете установить время, установив поле истечения срока действия сообщения или свойство X-Message-TTL, оба из которых имеют одинаковый эффект. Просто поле истечения срока действия является струнным параметром, поэтому вам нужно написать строку Int-Type: когда вышеупомянутое сообщение будет добавлено в очередь, 60 секунд прошло, если она не будет потреблена, оно умрет. Не будет употреблять потребители. Новости этой новости не являются «мертвыми» и потребляются потребителями. Мертвые буквы не будут удалены и выпущены в очереди, они будут подсчитаны количество сообщений в очереди.
Процесс схема потока
Создать переключатели и очереди
Создать переключатель мертвой буквы
Как показано на рисунке, он должен создать обычный переключатель. Ради легкого различия, название переключателя задержка
Создать очередь автоматического истечения срока годности
Основная функция этой очереди - регулярно делать сообщения. Например, если нам нужно закрыть заказ через 2 часа, нам нужно поместить сообщение в эту очередь и установить время истечения срока действия сообщения до 2 часов
Создайте автоматическую очередь с истекшим сроком действия с именем с именем Delay_Queue1. Конечно, параметры на рисунке не истекают автоматически, потому что мы не устанавливаем параметр X-Message-TTL. Если сообщения во всей очереди одинаковы, вы можете установить их. Для гибкости это не установлено. Два других параметра x-dead-letter-exchange представляют переключатель, в который будет входить сообщение после истечения срока действия сообщения. Конфигурация здесь - задержка, то есть переключатель мертвой буквы. x-dead-letter-rout-key предназначен для настройки ключа маршрутизации переключателя мертвой буквы после истечения срока действия сообщения. То же самое верно для отправки ключа маршрутизации сообщения. Согласно этому ключу, сообщение будет помещено в другую очередь.
Создать очередь обработки сообщений
Эта очередь - это очередь, которая действительно обрабатывает сообщения, и все сообщения, входящие в эту очередь, будут обработаны
Имя очереди сообщений - dower_queue2
Очередь сообщений, связанная с переключением
Введите страницу сведений о коммутаторе и связывайте созданные две очереди ( очередь задержки1 и Queue2 задержки ) с переключателем.
Ключ маршрутизации в очереди на автоматическую срок годности установлен на задержку
Связывать очередь задержки2
Ключ из очереди задержки 2 должен быть установлен для создания автоматического истекшего срока очереди X-Dead-Retter-Routing-ключа, так что, когда сообщение истекает, сообщение может быть автоматически размещено в очереди Dolement_queue2.
Страница управления гранием, как показано на рисунке:
Конечно, это привязка также может быть реализована с использованием кода, только для интуитивного выражения, поэтому платформа управления, используемая в этой статье, используется для работы
Отправить сообщение
String msg = "Hello Word"; MessageProperties MessageProperties = new MessageProperties (); MessageProperties.SetExpiration ("6000"); messageProperties.setCorrelationId (uuid.randomuuid (). ToString (). GetBytes ()); Сообщение сообщения = новое сообщение (msg.getbytes (), messageProperties); rabbittemplate.convertandsend («задержка», «задержка», сообщение);Основной код
MessageProperties.SetExpiration ("6000");Установите сообщение на срок действия истечения через 6 секунд
ПРИМЕЧАНИЕ. Поскольку сообщение должно быть автоматически истек, вы не должны устанавливать прослушивание dolement_queue1, а сообщения в этой очереди не могут быть приняты. В противном случае, как только сообщение будет использовано, истечения срока действия не будет.
Получить сообщение
Просто настройте Dolement_queue2, чтобы прослушать получение сообщений
пакет wang.raye.rabbitmq.demo1; import org.springframework.amqp.core.acknowledgemode; Импорт org.springframework.amqp.core.binding; Импорт org.springframework.amqp.core.binding; Импорт org.springframework.amqp.core.bindingBuilder; Импорт org.springframework.amqp.core.directexchange; Импорт org.springframework.amqp.core.message; Импорт org.springframework.amqp.core.queue; Импорт org.springframework.amqp.rabbit.connection.cachingConnectionFactory; Импорт org.springframework.amqp.rabbit.connection.connectionFactory; Импорт org.springframework.amqp.rabbit.core.ChannelAwareMessAgeListener; Импорт org.springframework.amqp.rabbit.listener.simplemessageListenerContainer; Импорт org.springframework.beans.factory.annotation.autowired; Импорт org.springframework.context.annotation.bean; Import org.springframework.context.annotation.configuration; @ConfigurationPublic Class quidqueue { / ** Имя переключателя сообщения* / public Static Final String Exchange = "Delay"; / ** queue Key1*/ public Static Final String RoutingKey1 = "Delay"; / ** queue key2*/ public static final String RoutingKey2 = "dower_key"; / *** Информация о ссылке конфигурации* @return*/ @bean public connectory connectory connectionfactory () {cachingConnectionFactory ConnectionFactory = new CachingConnectionFactory ("120.76.237.8", 5672); connectionFactory.SetUSERNAME ("kberp"); connectionFactory.SetPassword ("kberp"); ConnectionFactory.SetVirtualHost ("/"); ConnectionFactory.SetPublisherConfirms (True); // return ConnectionFactory должно быть установлено; } /** * Configure message switch* Configure FanoutExchange for consumers: Distribute messages to all bound queues, without the concept of routingkey HeadersExchange: match DirectExchange by adding attribute key-value: Distribute to the specified queue according to routingkey TopicExchange: Multi-key matching */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE, true, false); } / ** * Настроить очередь сообщений 2 * configure * / @bean public queue queue () {return new queue ("dower_queue2", true); // Цитата Постоянная}/*** Связанная очередь сообщений 2 с помощью переключателя* Настройка для потребителей* @return*/@bean @autowired public swinting привязка () {return bindingbuilder.bind (queue ()). } / *** Примите слушатель сообщения, этот слушатель примет сообщение из очереди сообщений 1* Настройка для потребителей* @return* / @bean @autowired public summeressageListenerContainer MessageContainer2 (connectionFactory connectoryFactory) {coneyessAgelistenerContainer incepaner = new SomelemessAgeListenerAner (); Container.SetQueues (queue ()); container.setExposelistenerChannel (true); контейнер.setMaxConcurrentConsumers (1); контейнер.setConcurrentConsumers (1); container.setacknowledgemode (ancknowledgemode.manual); // Установить режим подтверждения вручную подтвердить контейнер.setmessageListener (new Channelawaremessagelistener () {public void onmessage (сообщение сообщением, com.rabbitmq.client.calel Channel) Throws Exception {byte [] body = message.getbody (); System.out.println ("destive_Queue2 Получил сообщение:" + string (body. body.out.println ("destive_queue2); channel.basicack (message.getmessageproperties (). getDeliveryTag (), false); вернуть контейнер; }}Просто обрабатывайте задачи, которые необходимо регулярно обрабатывать во время прослушивания сообщений. Поскольку Rabbitmq может отправлять сообщения, вы можете отправить код функции задачи, такой как закрытие заказа и отправка идентификатора заказа, что позволяет избежать необходимости запросить те заказы, которые необходимо закрыть и увеличивать бремя на MySQL. В конце концов, после того, как объем заказа будет большим, сам запрос также является очень дорогой вещью.
Суммировать
Реализация задач, основанных на RabbitMQ, состоит в том, чтобы установить время истечения срока действия для сообщения, поместить его в очередь, которая не прочитана, чтобы сообщение автоматически было передано в другую очередь после истечения срока действия и отслеживать слушатель этого сообщения в очереди, чтобы выполнить конкретные операции задач по времени.
Выше всего содержание этой статьи. Я надеюсь, что это будет полезно для каждого обучения, и я надеюсь, что все будут поддерживать Wulin.com больше.