Overview
Basic concepts
Broker
Message queue server entity used to process data
vhost
The virtual message host created by the RabbitMQ server has its own permission mechanism. Multiple vhosts can be opened in a broker for permission isolation for different users, and vhosts are also completely isolated.
Productor
Generate data for message communication
channel
Message channel, multiple channels can be established in AMQP, each channel represents a session task.
exchange
direct
Forward messages to the queue specified by routing-key
fanout
fanout
Forwarding messages to all bound queues is similar to a way of broadcasting.
topic
topic
Forward messages according to rules. This rule is mostly pattern matching, and it also appears more flexible.
queue
queue
binding
It represents the relationship between the switch and the queue. When binding, it comes with an additional parameter binding-key to match the routing-key.
Consumer
Listen to the message queue to read the message data
Three Exchange modes (fanout, direct, topic) implementation under springboot
Reference spring-boot-starter-amqp in pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
Add rabbitmq configuration
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
direct
In direct mode, only queue is required to define it in general. Use the built-in switch (defaultExchange) without binding the switch.
@Configurationpublic class RabbitP2PConfigure { public static final String QUEUE_NAME = "p2p-queue"; @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); }} @RunWith(SpringRunner.class)@SpringBootTest(classes = BootCoreTestApplication.class)@Slf4jpublic class RabbitTest { @Autowired private AmqpTemplate amqpTemplate; /** * Send*/ @Test public void sendLazy() throws InterruptedException { City city = new City(234556666L, "direct_name", "direct_code"); amqpTemplate.convertAndSend(RabbitLazyConfigure.QUEUE_NAME, city); } /** * Receive*/ @Test public void receive() throws InterruptedException { Object obj = amqpTemplate.receiveAndConvert(RabbitLazyConfigure.QUEUE_NAME); Assert.notNull(obj, ""); log.debug(obj.toString()); }}Applicable scenarios: point-to-point
fanout
Fanout mode requires binding multiple queues to the same switch
@Configurationpublic class RabbitFanoutConfigure { public static final String EXCHANGE_NAME = "fanout-exchange"; public static final String FANOUT_A = "fanout.A"; public static final String FANOUT_B = "fanout.B"; public static final String FANOUT_C = "fanout.C"; @Bean public Queue AMessage() { return new Queue(FANOUT_A); } @Bean public Queue BMessage() { return new Queue(FANOUT_B); } @Bean public Queue CMessage() { return new Queue(FANOUT_C); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } @Bean public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); }}Sender
@Slf4jpublic class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendFanout(Object message) { log.debug("begin send fanout message<" + message + ">"); rabbitTemplate.convertAndSend(RabbitFanoutConfigure.EXCHANGE_NAME, "", message); }}We can use @RabbitListener to listen to multiple queues to consume
@Slf4j@RabbitListener(queues = { RabbitFanoutConfigure.FANOUT_A, RabbitFanoutConfigure.FANOUT_B, RabbitFanoutConfigure.FANOUT_C})public class Receiver { @RabbitHandler public void receiveMessage(String message) { log.debug("Received <" + message + ">"); }} Applicable scenarios
- Large-scale multi-user online (MMO) games can use it to handle global events such as ranking updates
- Sports news websites can use it to distribute score updates to mobile clients in near real time
- Distribution system uses it to broadcast various states and configuration updates
- During group chat, it is used to distribute messages to users participating in group chat.
topic
This pattern is relatively complex. Simply put, each queue has its own topic of concern. All messages have a "title". Exchange will forward the messages to queues whose topics are concerned fuzzy matches RouteKey.
When binding, provide a topic that the queue is concerned about, such as "topic.# ("#" means 0 or several keywords, and "*" means a keyword.)
@Configurationpublic class RabbitTopicConfigure { public static final String EXCHANGE_NAME = "topic-exchange"; public static final String TOPIC = "topic"; public static final String TOPIC_A = "topic.A"; public static final String TOPIC_B = "topic.B"; @Bean public Queue queueTopic() { return new Queue(RabbitTopicConfigure.TOPIC); } @Bean public Queue queueTopicA() { return new Queue(RabbitTopicConfigure.TOPIC_A); } @Bean public Queue queueTopicB() { return new Queue(RabbitTopicConfigure.TOPIC_B); } @Bean public TopicExchange exchange() { TopicExchange topicExchange = new TopicExchange(EXCHANGE_NAME); topicExchange.setDelayed(true); return new TopicExchange(EXCHANGE_NAME); } @Bean public Binding bindingExchangeTopic(Queue queueTopic, TopicExchange exchange) { return BindingBuilder.bind(queueTopic).to(exchange).with(RabbitTopicConfigure.TOPIC); } @Bean public Binding bindingExchangeTopics(Queue queueTopicA, TopicExchange exchange) { return BindingBuilder.bind(queueTopicA).to(exchange).with("topic.#"); }}At the same time, listen to three queues
@Slf4j@RabbitListener(queues = { RabbitTopicConfigure.TOPIC, RabbitTopicConfigure.TOPIC_A, RabbitTopicConfigure.TOPIC_B})public class Receiver { @RabbitHandler public void receiveMessage(String message) { log.debug("Received <" + message + ">"); }}Through testing we can find
@RunWith(SpringRunner.class)@SpringBootTest(classes = BootCoreTestApplication.class)public class RabbitTest { @Autowired private AmqpTemplate rabbitTemplate; @Test public void sendAll() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, "topic.test", "send All"); } @Test public void sendTopic() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, RabbitTopicConfigure.TOPIC, "send Topic"); } @Test public void sendTopicA() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, RabbitTopicConfigure.TOPIC_A, "send TopicA"); }} Applicable scenarios
- Distribute data about specific geographic locations, such as point of sale
- Backstage tasks completed by multiple workers, each worker responsible for handling certain specific tasks
- Stock price updates (and other types of financial data updates)
- News updates involving categories or tags (for example, for specific sports or teams)
- Coordination of different types of services in the cloud
- Distributed architecture/system-based software package, where each builder can handle only one specific architecture or system.
Delay queue
Delayed consumption:
Delayed retry:
Set the switch delay property to true
@Configurationpublic class RabbitLazyConfigure { public static final String QUEUE_NAME = "lazy-queue-t"; public static final String EXCHANGE_NAME = "lazy-exchange-t"; @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); } @Bean public DirectExchange defaultExchange() { DirectExchange directExchange = new DirectExchange(EXCHANGE_NAME, true, false); directExchange.setDelayed(true); return directExchange; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(QUEUE_NAME); }}Set the delay time when sending
@Slf4jpublic class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void sendLazy(Object msg) { log.debug("begin send lazy message<" + msg + ">"); rabbitTemplate.convertAndSend(RabbitLazyConfigure.EXCHANGE_NAME, RabbitLazyConfigure.QUEUE_NAME, msg, message -> { message.getMessageProperties().setHeader("x-delay", 10000); return message; } ); }}Finish
Please check the official documents directly for various use cases
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.