Cena
As tarefas cronometradas são frequentemente necessárias no desenvolvimento. Para shoppings, as tarefas cronometradas são particularmente numerosas, como expiração cronometrada do cupom, fechamento cronometrado da ordem, pagamento do WeChat por 2 horas sem pagar para encerrar ordens, etc., todas as quais exigem tarefas cronometradas. No entanto, há um problema com as próprias tarefas de tempo. De um modo geral, consultamos o banco de dados por meio de pesquisa cronometrada para determinar se existem tarefas a serem executadas. Ou seja, não importa o quê, precisamos consultar o banco de dados primeiro. Algumas tarefas têm requisitos altos para precisão de tempo e precisam consultar uma vez por segundo. Não importa se o sistema é pequeno. Se o sistema em si for grande e houver muitos dados, isso não é muito realista, portanto, outros métodos são necessários. Obviamente, existem muitas maneiras de implementá -las, como filas de tempo de implementação de Redis, filas de atraso do JDK com base em filas de prioridade, rodadas de tempo etc. porque usamos o RabbitMQ em nosso projeto, com base no princípio de fácil desenvolvimento e manutenção, usamos a fila de atraso do RabbitMQ para implementar as tarefas do tempo. Se você não sabe o que é o RabbitMQ ou como o Springboot integra o RabbitMQ, você pode verificar meu artigo anterior Spring Boot Integrated RabbitMQ
Fila de atraso do RabbitMQ
O próprio RabbitMQ não possui uma fila de atraso e só pode ser implementado através das características da fila do RabbitMQ. Se o RabbitMQ quiser implementar as filas de atraso, você precisa usar o interruptor de letra morta do RabbitMQ (troca) e o tempo de sobrevivência da mensagem TTL (hora de viver)
Interruptor de letra morta
Uma mensagem entrará em um interruptor de letra morta se as seguintes condições forem cumpridas. Lembre -se de que este é um interruptor em vez de uma fila. Um interruptor pode corresponder a muitas filas.
O interruptor da letra morta é um interruptor comum, mas como jogamos mensagens expiradas, é chamado de interruptor de letra morta. Isso não significa que o interruptor da letra morta seja um interruptor específico.
Mensagem TTL (hora da sobrevivência da mensagem)
O TTL da mensagem é o tempo de sobrevivência da mensagem. O RabbitMQ pode definir TTL para filas e mensagens separadamente. A configuração da fila significa que a fila não tem tempo de retenção conectado aos consumidores e você também pode fazer configurações separadas para cada mensagem individual. Após esse período, pensamos que as notícias estão mortas e é chamada de carta de morte. Se a fila estiver definida e a mensagem for definida, a pequena será realizada. Portanto, se uma mensagem for roteada para uma fila diferente, o tempo de morte desta mensagem pode ser diferente (diferentes configurações da fila). Aqui falamos sobre o TTL de uma única mensagem, porque é a chave para implementar tarefas atrasadas.
byte [] messageBodyBytes = "Olá, mundo!". getBytes (); AMQP.BASICPROPERTIES PROPERTIES = NOVA AMQP.BASICPROPERTIES (); Propriedades.Setexpiration ("60000"); Channel.Basicpublish ("My-Exchange", "Filee-key", Propriedades, MessageBodyBytes);Você pode definir o tempo definindo o campo de expiração da mensagem ou a propriedade X-Message-TTL, os quais têm o mesmo efeito. Só que o campo de expiração é um parâmetro de string, então você precisa escrever uma string do tipo int: quando a mensagem acima é lançada na fila, 60 segundos passados, se não for consumido, ela morrerá. Não será consumido pelos consumidores. As notícias por trás desta notícia não são "mortas" e são consumidas pelos consumidores. As cartas mortas não serão excluídas e liberadas na fila, elas serão contadas no número de mensagens na fila.
Fluxograma de processo
Crie interruptores e filas
Crie um interruptor de letra morta
Conforme mostrado na figura, é para criar um comutador comum. Por uma questão de distinção fácil, o nome do interruptor é atraso
Crie uma fila de mensagens de expiração automática
A principal função desta fila é fazer com que as mensagens expirem regularmente. Por exemplo, se precisarmos fechar o pedido em 2 horas, precisamos colocar a mensagem nesta fila e definir o tempo de expiração da mensagem para 2 horas
Crie uma fila expirada automaticamente denominada detrase_queue1. Obviamente, os parâmetros da imagem não expirarão automaticamente a mensagem, porque não definimos o parâmetro X-Message-TTL. Se as mensagens em toda a fila forem as mesmas, você poderá defini -la. Para flexibilidade, não está definido. Os outros dois parâmetros X-Letter-Trochange representam o comutador ao qual a mensagem entrará após o expulso da mensagem. A configuração aqui é o atraso, ou seja, a troca de letra morta. A tecla X-Dead-Letter-Routing é configurar a chave de roteamento do interruptor da letra morta após a mensagem. O mesmo vale para enviar a chave de roteamento da mensagem. De acordo com essa chave, a mensagem será colocada em uma fila diferente.
Crie uma fila de processamento de mensagens
Esta fila é a fila que realmente processa mensagens, e todas as mensagens que entram nesta fila serão processadas
O nome da fila de mensagens é atrasado_queue2
Fila de mensagens com destino a mudar
Digite a página Detalhes do Switch e vincule as duas filas criadas ( Queee1 de atraso e a fila de atraso2 ) ao comutador.
A chave de roteamento da fila de mensagens de expiração automática é definida para atrasar
Atraso na fila de atraso2
A chave do atraso na Queue2 deve ser definida para criar um parâmetro de chave de escavação X de morto-escavador expirado automaticamente, de modo que, quando a mensagem expirar, a mensagem pode ser automaticamente colocada na fila de atraso_queue2.
A página de gerenciamento encadernada é como mostrado na figura:
Obviamente, essa ligação também pode ser implementada usando código, apenas para expressão intuitiva, portanto a plataforma de gerenciamento usada neste artigo é usada para operar
Envie uma mensagem
String msg = "hello word"; MessageProperties messageProperties = new messageProperties (); MessageProperties.Setexpiration ("6000"); MessageProperties.SetCorreLationId (uuid.randomuuid (). ToString (). getBytes ()); Mensagem mensagem = nova mensagem (msg.getBytes (), messageProperties); rabbittemplate.convertandSend ("atraso", "atraso", mensagem);O código principal é
MessageProperties.Setexpiration ("6000");Defina a mensagem para expirar após 6 segundos
Nota: Como a mensagem deve ser expirada automaticamente, você não deve definir a escuta do deoun_queue1 e as mensagens nesta fila não podem ser aceitas. Caso contrário, uma vez que a mensagem for consumida, não haverá expiração.
Receber mensagem
Basta configurar o atraso_queue2 para ouvir mensagens
pacote wang.raye.rabbitmq.demo1; importar org.springframework.amqp.core.acknowledgemode; importar org.springframework.amqp.core.binding; importar org.springframework.amqp.core.binding; importar org.springframework.amqp.core.bindingbuilder; importar org.springframework.amqp.core.directExchange; importar org.springframework.amqp.core.message; importar org.springframework.amqp.core.queue; importar org.springframework.amqp.rabbit.connection.cachingConnectionFactory; importar org.springframework.amqp.rabbit.connection.connectionFactory; importar org.springframework.amqp.rabbit.core.channelawaremessagelistener; importar org.springframework.amqp.rabbit.listener.simpleMessaGelistEnerContainer; importar org.springframework.beans.factory.annotation.autowired; importar org.springframework.context.annotation.bean; importar org.springframework.context.annotation.configuration; @ConfigurationPublic Classe deLeweue { / ** Nome do interruptor da mensagem* / public static final string Exchange = "atraso"; / ** fila key1*/ public static final string roteingKey1 = "atraso"; / ** fila key2*/ public static final string routingKey2 = "deoun_key"; / *** Informações do link de configuração* @return*/ @Bean public ConnectionFactory ConnectionFactory () {CachingConnectionFactory ConnectionFactory = new CachingConnectionFactory ("120.76.237.8", 5672); ConnectionFactory.setUsername ("Kberp"); ConnectionFactory.SetPassword ("Kberp"); ConnectionFactory.SetVirtualHost ("/"); ConnectionFactory.SetPublisherConfirms (true); // Return ConnectionFactory deve ser definido; } / *** Configure o interruptor da mensagem* Configure o fanoutExchange para os consumidores: distribua mensagens para todas as filas ligadas, sem o conceito de roteingkey headerSexchange: corresponde ao DirectExChange adicionando o valor do atributo: Multi-TELLE (@Bean the RetwerChank (@DeeTCHANCHANCENCHANCENCHANCENCIMEX (DIRETHCHANGET (Distribui), que é necessário, o valor do traseiro, que se retex) falso); } / ** * Configurar a fila da mensagem 2 * Configurar * / @Bean public fila fila () {return new file ("deoun_queue2", true); // CITAÇÃO PERSISTENTE}/*** Ligue a fila de mensagens 2 com o comutador* Configure para os consumidores* @return*/@Bean @AUTOWIRED Public Binding Binding () {return bindingbuilder.bind (QUEUE ()). Para (defaultExchange ()). Com (deLeouse.ROUTEKE2); } / *** Aceite o ouvinte da mensagem, este ouvinte aceitará a mensagem da fila de mensagens 1* Configurar para os consumidores* @return* / @Bean @AUTOWIRED Public SimpleMessaGelistEnerContainer MessagECONTAINER2 (ConnectionFactory ConnectionFactory) {SimpleMessessageListenerArCoTainner CaiMeReSess = NewemEssAssagel (SimpationAnTeNeStActory) {SimpleMessessageListerEnnerCoTerCoTerC); container.setQueues (Queue ()); container.setexposelistenerChannel (true); container.setMaxConcurrentConsumers (1); container.setConcurrentConsumers (1); container.setackNowledGemode (reconheceuMode.Manual); // Definir modo de confirmação confirmar manualmente o container.SetMessageListener (novo ChannelAwaremessageListener () {public void onMessage (mensagem da mensagem, com.rabbitmq.client.Channel Channel) lança ("deLeue2 [] body = message.getbody; canal.basicack (message.getMessageProperties (). getDeliveryTag (), false); recipiente de retorno; }}Basta lidar com tarefas que precisam ser processadas regularmente durante a escuta de mensagens. Como o RabbitMQ pode enviar mensagens, você pode enviar o código do recurso de tarefa, como fechar o pedido e enviar o ID do pedido, o que evita a necessidade de consultar os pedidos que precisam ser fechados e aumentar a carga no MySQL. Afinal, uma vez que o volume de pedidos é grande, a própria consulta também é uma coisa muito cara.
Resumir
A implementação de tarefas de tempo com base no RabbitMQ é definir um tempo de validade para a mensagem, colocá -la em uma fila que não é lida, para que a mensagem seja automaticamente transferida para outra fila após a expiração e monitore o ouvinte desta mensagem de fila para lidar com as operações específicas das tarefas de tempo.
O exposto acima é todo o conteúdo deste artigo. Espero que seja útil para o aprendizado de todos e espero que todos apoiem mais o wulin.com.