概述
基本概念
Broker
用來處理數據的消息隊列服務器實體
vhost
由RabbitMQ服務器創建的虛擬消息主機,擁有自己的權限機制,一個broker裡可以開設多個vhost,用於不同用戶的權限隔離,vhost之間是也完全隔離的。
productor
產生用於消息通信的數據
channel
消息通道,在AMQP中可以建立多個channel,每個channel代表一個會話任務。
exchange
direct
轉發消息到routing-key指定的隊列
fanout
fanout
轉發消息到所有綁定的隊列,類似於一種廣播發送的方式。
topic
topic
按照規則轉發消息,這種規則多為模式匹配,也顯得更加靈活
queue
queue
binding
表示交換機和隊列之間的關係,在進行綁定時,帶有一個額外的參數binding-key,來和routing-key相匹配。
consumer
監聽消息隊列來進行消息數據的讀取
springboot下三種Exchange模式(fanout,direct,topic)實現
pom.xml中引用spring-boot-starter-amqp
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
增加rabbitmq配置
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
direct
direct模式一般情況下只需要定義queue 使用自帶交換機(defaultExchange)無需綁定交換機
@Configurationpublic class RabbitP2PConfigure { public static final String QUEUE_NAME = "p2p-queue"; @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); }} @RunWith(SpringRunner.class)@SpringBootTest(classes = BootCoreTestApplication.class)@Slf4jpublic class RabbitTest { @Autowired private AmqpTemplate amqpTemplate; /** * 發送*/ @Test public void sendLazy() throws InterruptedException { City city = new City(234556666L, "direct_name", "direct_code"); amqpTemplate.convertAndSend(RabbitLazyConfigure.QUEUE_NAME, city); } /** * 領取*/ @Test public void receive() throws InterruptedException { Object obj = amqpTemplate.receiveAndConvert(RabbitLazyConfigure.QUEUE_NAME); Assert.notNull(obj, ""); log.debug(obj.toString()); }}適用場景:點對點
fanout
fanout則模式需要將多個queue綁定在同一個交換機上
@Configurationpublic class RabbitFanoutConfigure { public static final String EXCHANGE_NAME = "fanout-exchange"; public static final String FANOUT_A = "fanout.A"; public static final String FANOUT_B = "fanout.B"; public static final String FANOUT_C = "fanout.C"; @Bean public Queue AMessage() { return new Queue(FANOUT_A); } @Bean public Queue BMessage() { return new Queue(FANOUT_B); } @Bean public Queue CMessage() { return new Queue(FANOUT_C); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } @Bean public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); }}發送者
@Slf4jpublic class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendFanout(Object message) { log.debug("begin send fanout message<" + message + ">"); rabbitTemplate.convertAndSend(RabbitFanoutConfigure.EXCHANGE_NAME, "", message); }}我們可以通過@RabbitListener監聽多個queue來進行消費
@Slf4j@RabbitListener(queues = { RabbitFanoutConfigure.FANOUT_A, RabbitFanoutConfigure.FANOUT_B, RabbitFanoutConfigure.FANOUT_C})public class Receiver { @RabbitHandler public void receiveMessage(String message) { log.debug("Received <" + message + ">"); }}適用場景
- 大規模多用戶在線(MMO)遊戲可以使用它來處理排行榜更新等全局事件
- 體育新聞網站可以用它來近乎實時地將比分更新分發給移動客戶端
- 分發系統使用它來廣播各種狀態和配置更新
- 在群聊的時候,它被用來分發消息給參與群聊的用戶
topic
這種模式較為複雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“標題”,Exchange會將消息轉發到所有關注主題能與RouteKey模糊匹配的隊列。
在進行綁定時,要提供一個該隊列關心的主題,如“topic.# (“#”表示0個或若干個關鍵字,“*”表示一個關鍵字。 )
@Configurationpublic class RabbitTopicConfigure { public static final String EXCHANGE_NAME = "topic-exchange"; public static final String TOPIC = "topic"; public static final String TOPIC_A = "topic.A"; public static final String TOPIC_B = "topic.B"; @Bean public Queue queueTopic() { return new Queue(RabbitTopicConfigure.TOPIC); } @Bean public Queue queueTopicA() { return new Queue(RabbitTopicConfigure.TOPIC_A); } @Bean public Queue queueTopicB() { return new Queue(RabbitTopicConfigure.TOPIC_B); } @Bean public TopicExchange exchange() { TopicExchange topicExchange = new TopicExchange(EXCHANGE_NAME); topicExchange.setDelayed(true); return new TopicExchange(EXCHANGE_NAME); } @Bean public Binding bindingExchangeTopic(Queue queueTopic, TopicExchange exchange) { return BindingBuilder.bind(queueTopic).to(exchange).with(RabbitTopicConfigure.TOPIC); } @Bean public Binding bindingExchangeTopics(Queue queueTopicA, TopicExchange exchange) { return BindingBuilder.bind(queueTopicA).to(exchange).with("topic.#"); }}同時去監聽三個queue
@Slf4j@RabbitListener(queues = { RabbitTopicConfigure.TOPIC, RabbitTopicConfigure.TOPIC_A, RabbitTopicConfigure.TOPIC_B})public class Receiver { @RabbitHandler public void receiveMessage(String message) { log.debug("Received <" + message + ">"); }}通過測試我們可以發現
@RunWith(SpringRunner.class)@SpringBootTest(classes = BootCoreTestApplication.class)public class RabbitTest { @Autowired private AmqpTemplate rabbitTemplate; @Test public void sendAll() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, "topic.test", "send All"); } @Test public void sendTopic() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, RabbitTopicConfigure.TOPIC, "send Topic"); } @Test public void sendTopicA() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, RabbitTopicConfigure.TOPIC_A, "send TopicA"); }}適用場景
- 分發有關於特定地理位置的數據,例如銷售點
- 由多個工作者(workers)完成的後台任務,每個工作者負責處理某些特定的任務
- 股票價格更新(以及其他類型的金融數據更新)
- 涉及到分類或者標籤的新聞更新(例如,針對特定的運動項目或者隊伍)
- 雲端的不同種類服務的協調
- 分佈式架構/基於系統的軟件封裝,其中每個構建者僅能處理一個特定的架構或者係統。
延遲隊列
延遲消費:
延遲重試:
設置交換機延遲屬性為true
@Configurationpublic class RabbitLazyConfigure { public static final String QUEUE_NAME = "lazy-queue-t"; public static final String EXCHANGE_NAME = "lazy-exchange-t"; @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); } @Bean public DirectExchange defaultExchange() { DirectExchange directExchange = new DirectExchange(EXCHANGE_NAME, true, false); directExchange.setDelayed(true); return directExchange; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(QUEUE_NAME); }}發送時設置延遲時間即可
@Slf4jpublic class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendLazy(Object msg) { log.debug("begin send lazy message<" + msg + ">"); rabbitTemplate.convertAndSend(RabbitLazyConfigure.EXCHANGE_NAME, RabbitLazyConfigure.QUEUE_NAME, msg, message -> { message.getMessageProperties().setHeader("x-delay", 10000); return message; } ); }}結束
各種使用案例請直接查看官方文檔
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持武林網。