장면
시간이 정해진 작업은 종종 개발에 필요합니다. 쇼핑몰의 경우, 시간 완료 만료, 주문 시간 마감, 주문 마감 명령 등을 지불하지 않고 2 시간 동안 WeChat 결제 등 시간이 필요합니다. 그러나 타이밍 작업 자체에 문제가 있습니다. 일반적으로 말하면, 우리는 시간이 정해진 폴링을 통해 데이터베이스를 쿼리하여 실행해야 할 작업이 있는지 여부를 결정합니다. 즉, 무엇이든 먼저 데이터베이스를 쿼리해야합니다. 일부 작업은 시간 정확도에 대한 요구 사항이 높으며 1 초에 한 번 쿼리해야합니다. 시스템이 작 든 상관 없습니다. 시스템 자체가 크고 데이터가 많으면 현실적이지 않으므로 다른 방법이 필요합니다. 물론, Redis 구현 타이밍 큐, 우선 순위 대기열, 시간 라운드 등을 기반으로 JDK 지연 큐 등과 같은 여러 가지 방법이 있습니다. 우리는 손쉬운 개발 및 유지 보수의 원칙에 따라 프로젝트에서 RabbitMQ를 사용하기 때문에 RabbitMQ 지연 큐를 사용하여 타이밍 작업을 구현합니다. RabbitMQ가 무엇인지 또는 SpringBoot가 RabbitMQ를 통합하는 방법을 모르는 경우, 이전 기사 Spring Boot Integrated Rabbitmq를 확인할 수 있습니다.
RabbitMQ 지연 대기열
RabbitMQ 자체는 지연 대기열이 없으며 RabbitMQ의 큐의 특성을 통해서만 구현할 수 있습니다. RabbitMQ가 지연 대기열을 구현하려면 RabbitMQ의 데드 레터 스위치 (Exchange) 및 메시지 생존 시간 TTL (라이브 시간)을 사용해야합니다.
데드 레터 스위치
다음 조건이 충족되면 메시지가 데드 레터 스위치를 입력합니다. 이것은 대기열 대신 스위치입니다. 스위치는 많은 대기열에 해당 할 수 있습니다.
데드 레터 스위치는 평범한 스위치이지만 만료 된 메시지를 버리기 때문에 데드 레터 스위치라고합니다. 데드 레터 스위치가 특정 스위치라는 것을 의미하지는 않습니다.
메시지 TTL (메시지 생존 시간)
메시지의 TTL은 메시지의 생존 시간입니다. RabbitMQ는 대기열과 메시지에 대한 TTL을 별도로 설정할 수 있습니다. 대기열 설정은 큐에 소비자와 연결된 유지 시간이 없으며 각 개별 메시지에 대해 별도의 설정을 만들 수도 있음을 의미합니다. 그 후, 우리는 뉴스가 죽었다고 생각하며,이를 죽음의 편지라고합니다. 큐가 설정되고 메시지가 설정되면 작은 메시지가 표시됩니다. 따라서 메시지가 다른 대기열로 라우팅되면이 메시지의 죽음 시간이 다를 수 있습니다 (다른 대기열 설정). 여기서는 지연된 작업을 구현하는 데 열쇠이기 때문에 단일 메시지의 TTL에 대해 이야기합니다.
바이트 [] messagebodybytes = "hello, world!". getBytes (); amqp.basicproperties 속성 = 새로운 amqp.basicproperties (); 속성 .setexpiration ( "60000"); channel.basicpublish ( "my-exchange", "queue-key", 속성, MessageBodyBytes);
메시지의 만료 필드 또는 X-Message-TTL 속성을 설정하여 시간을 설정할 수 있습니다. 만료 필드는 문자열 매개 변수이므로 int-type 문자열을 작성해야합니다. 위의 메시지가 큐에 던져지면 60 초가 소비되지 않으면 죽을 것입니다. 소비자가 소비하지 않습니다. 이 뉴스의 배후에 대한 소식은 "죽은"것이 아니며 소비자가 소비합니다. 죽은 편지는 큐에서 삭제 및 릴리스되지 않으며 대기열의 메시지 수로 계산됩니다.
프로세스 흐름도
스위치와 대기열을 만듭니다
데드 레터 스위치를 만듭니다
그림에서 볼 수 있듯이 일반 스위치를 만드는 것입니다. 쉬운 구별을 위해 스위치의 이름은 지연입니다.
자동 만료 메시지 대기열을 만듭니다
이 대기열의 주요 기능은 메시지가 정기적으로 만료되는 것입니다. 예를 들어, 주문을 2 시간 안에 닫아야하는 경우 메시지를이 큐에 넣고 메시지 만료 시간을 2 시간으로 설정해야합니다.
Delay_queue1이라는 자동으로 만료 된 대기열을 만듭니다. 물론 x-message-ttl 매개 변수를 설정하지 않기 때문에 그림의 매개 변수는 자동으로 메시지를 만료하지 않습니다. 전체 대기열의 메시지가 동일하다면 설정할 수 있습니다. 유연성을 위해 설정되지 않았습니다. 다른 두 매개 변수 x-dead-retter-exchange는 메시지가 만료 된 후 메시지가 입력되는 스위치를 나타냅니다. 여기서 구성은 지연, 즉 데드 레터 스위치입니다. X-Dead-Letter-Routing-Key는 메시지가 만료 된 후 Dead Letter Switch의 라우팅 키를 구성하는 것입니다. 메시지의 라우팅 키를 보내는 것도 마찬가지입니다. 이 키에 따르면 메시지는 다른 줄에 배치됩니다.
메시지 처리 큐를 만듭니다
이 대기열은 메시지를 진정으로 처리하는 대기열 이며이 대기열을 입력하는 모든 메시지가 처리됩니다.
메시지 큐의 이름은 Delay_queue2입니다
메시지 큐가 스위치에 바인딩됩니다
스위치 세부 사항 페이지를 입력하고 생성 된 두 큐 (지연 큐 1 및 지연 큐 2)를 스위치에 바인딩하십시오.
자동 만료 메시지 큐의 라우팅 키가 지연되도록 설정되었습니다.
바인딩 지연 퀴 루 2
지연 큐의 키는 자동으로 만료 된 큐 X- 드레드 레터-라우팅 키 매개 변수를 생성하도록 설정되어있어서 메시지가 만료되면 메시지를 Delay_Queue2 큐에 자동으로 배치 할 수 있습니다.
바운드 관리 페이지는 그림과 같이입니다.
물론,이 바인딩은 직관적 인 표현을 위해 코드를 사용하여 구현 될 수 있으므로이 기사에 사용 된 관리 플랫폼은 작동하는 데 사용됩니다.
메시지를 보내십시오
문자열 msg = "hello Word"; MessageProperties MessageProperties = 새 MessageProperties (); MessageProperties.setexpiration ( "6000"); MessageProperties.setCorrelationId (uuid.randomuuid (). toString (). getBytes ()); 메시지 메시지 = 새 메시지 (msg.getBytes (), MessageProperties); Rabbittemplate.convertandSend ( "지연", "지연", 메시지);
주요 코드는입니다
MessageProperties.setexpiration ( "6000");
6 초 후에 메시지를 만료하도록 설정하십시오
참고 : 메시지가 자동으로 만료되므로 Delay_queue1의 청취를 설정하지 않아야 하며이 대기열의 메시지를 수락 할 수 없습니다. 그렇지 않으면 메시지가 소비되면 만료가 없습니다.
메시지를받습니다
Delay_Queue2를 수신하는 메시지를 듣도록 구성하십시오
Package Wang.raye.rabbitmq.demo1; import org.springframework.amqp.core.acknowledgemode; org.springframework.amqp.core.binding import; org.springframework.amqp.core.binding import; org.springframework.amqp.core.bindingbuilder import; import org.springframework.amqp.core.directexchange; import org.springframework.amqp.core.message; import org.springframework.amqp.core.queue; org.springframework.amqp.rabbit.connection.cachingconnectionfactory; org.springframework.amqp.rabbit.connection.connectionFactory; org.springframework.amqp.rabbit.core.channelawaremessagelistener; org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer; org.springframework.beans.factory.annotation.autowired; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configurationpublic class Delayqueue { / ** 메시지 스위치* / public static final string exchange = "Delay"; / ** 큐 key1*/ public static final String RoutingKey1 = "Delay"; / ** 큐 key2*/ public static final String RoutingKey2 = "Delay_key"; / *** 구성 링크 정보* @return*/ @Bean public connectionFactory ConnectionFactory () {CachingConnectionFactory ConnectionFactory = 새로운 CachingConnectionFactory ( "120.76.237.8", 5672); ConnectionFactory.setUserName ( "Kberp"); ConnectionFactory.setPassword ( "Kberp"); ConnectionFactory.setVirtualHost ( "/"); ConnectionFactory.setPublisherConFirms (true); // return connectionFactory를 설정해야합니다. } / *** 메시지 스위치 구성* 소비자에 대한 FanOutexChange 구성 : 라우팅 키 헤드러스 렉스 change : 속성 키 값을 추가하여 DirecTexChange를 일치하지 않고 모든 바운드 큐에 메시지를 배포합니다. 참, 거짓); } / ** * 메시지 구성 큐 2 * 구성 * / @bean public queue queue () {return new queue ( "Delay_queue2", true); // QUOTE resistent}/*** 스위치를 사용하여 메시지 큐 바인드 큐를 바인딩* 소비자를위한 구성* @return*/@autowired public binding binding () {return bindingBuilder.bind (queue ()). to (defaultexchange ()). } / *** 메시지의 청취자를 받아들이면서,이 청취자는 메시지 큐 1에서 메시지를 수락합니다.* 소비자를위한 구성* @eautowired public simplemessagelistenercontainer messagecontainer2 (ConnectionActory ConnectionActory) {SimpleSesagelistEnercontainer 컨테이너 = new SimpleSsageRontener (ConnectionActory); container.setqueues (queue ()); 컨테이너 .SetexposElistenerChannel (true); Container.SetMaxConcurrentConsumers (1); Container.SetConcurrentConsumers (1); 컨테이너 .setackNowledGemode (alknitedGemode.manual); // 확인 모드 설정 수동으로 컨테이너 확인 컨테이너 확인 컨테이너 setMessAgelistener (new ChannelAwaremEsagelistener () {public void onMessage (com.rabbitmq.client.channel 채널)는 예외를 {byte [] body = message.get.get.get ( "system.out.println ("delave_queue2 수신 : " + new string (body (body) ( + new string)); channel.basicack (message.getMessageProperties (). getDeliveryTag (), false); 반환 컨테이너; }}메시지 청취 중에 정기적으로 처리 해야하는 작업을 처리하십시오. RabbitMQ는 메시지를 보낼 수 있으므로 주문 종료 및 주문 ID를 보내는 것과 같은 작업 기능 코드를 보낼 수 있습니다. 이는 폐쇄되어야하는 주문을 쿼리하고 MySQL의 부담을 늘릴 필요가 없습니다. 결국, 순서 볼륨이 크면 쿼리 자체도 매우 비싼 것입니다.
요약
RabbitMQ를 기반으로 타이밍 작업을 구현하면 메시지의 만료 시간을 설정하고 읽지 않은 대기열에 넣어 메시지가 만료 된 후 다른 대기열로 자동 전송 되고이 큐 메시지의 리스너를 모니터링하여 타이밍 작업의 특정 작업을 처리합니다.
위는이 기사의 모든 내용입니다. 모든 사람의 학습에 도움이되기를 바랍니다. 모든 사람이 wulin.com을 더 지원하기를 바랍니다.