1. Preface
There are 2 important concepts in message middleware: message broker and destination. When the message sender sends the message, the message is taken over by the message broker, which ensures that the message is delivered to the specified destination.
Our commonly used message brokers include JMS and AMQP specifications. Correspondingly, their common implementations are ActiveMQ and RabbitMQ.
2. Integrate ActiveMQ
2.1 Add dependencies
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId></dependency><!-- If you need to configure the connection pool, add the following dependencies--><dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
2.2 Add configuration
# activemq Configure spring.activemq.broker-url=tcp://192.168.2.12:61616spring.activemq.user=adminspring.activemq.password=adminspring.activemq.pool.enabled=falsespring.activemq.pool.max-connections=50# When using publish/subscribe mode, the following configuration needs to be set to truespring.jms.pub-sub-domain=false
Here spring.activemq.pool.enabled=false means closing the connection pool.
2.3 Coding
Configuration class:
@Configurationpublic class JmsConfirguration { public static final String QUEUE_NAME = "activemq_queue"; public static final String TOPIC_NAME = "activemq_topic"; @Bean public Queue queue() { return new ActiveMQQueue(QUEUE_NAME); } @Bean public Topic topic() { return new ActiveMQTopic(TOPIC_NAME); }}Responsible for creating queues and topics.
Message Producer:
@Componentpublic class JmsSender { @Autowired private Queue queue; @Autowired private Topic topic; @Autowired private JmsMessagingTemplate jmsTemplate; public void sendByQueue(String message) { this.jmsTemplate.convertAndSend(queue, message); } public void sendByTopic(String message) { this.jmsTemplate.convertAndSend(topic, message); }}Message Consumer:
@Componentpublic class JmsReceiver { @JmsListener(destination = JmsConfirguration.QUEUE_NAME) public void receiveByQueue(String message) { System.out.println("Receive queue message:" + message); } @JmsListener(destination = JmsConfirguration.TOPIC_NAME) public void receiveByTopic(String message) { System.out.println("Receive topic message:" + message); }}Message consumers listen to messages using @JmsListener annotation.
2.4 Test
@RunWith(SpringRunner.class)@SpringBootTestpublic class JmsTest { @Autowired private JmsSender sender; @Test public void testSendByQueue() { for (int i = 1; i < 6; i++) { this.sender.sendByQueue("hello activemq queue " + i); } } @Test public void testSendByTopic() { for (int i = 1; i < 6; i++) { this.sender.sendByTopic("hello activemq topic " + i); } }}Print result:
Receive queue message: hello activemq queue 1
Receive queue message: hello activemq queue 2
Receive queue message: hello activemq queue 3
Receive queue message: hello activemq queue 4
Receive queue messages: hello activemq queue 5
Set spring.jms.pub-sub-domain=true when testing publish/subscribe mode
Receive topic message: hello activemq topic 1
Receive topic message: hello activemq topic 2
Receive topic messages: hello activemq topic 3
Receive topic messages: hello activemq topic 4
Receive topic messages: hello activemq topic 5
3. Integrate RabbitMQ
3.1 Add dependencies
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
3.2 Add configuration
spring.rabbitmq.host=192.168.2.30spring.rabbitmq.port=5672spring.rabbitmq.username=lightspring.rabbitmq.password=lightspring.rabbitmq.virtual-host=/test
3.3 Coding
Configuration class:
@Configurationpublic class AmqpConfirguration { //================================================= public static final String SIMPLE_QUEUE = "simple_queue"; @Bean public Queue queue() { return new Queue(SIMPLE_QUEUE, true); } //================= Public static final String PS_QUEUE_1 = "ps_queue_1"; public static final String PS_QUEUE_2 = "ps_queue_2"; public static final String FANOUT_EXCHANGE = "fanout_exchange"; @Bean public Queue psQueue1() { return new Queue(PS_QUEUE_1, true); } @Bean public Queue psQueue2() { return new Queue(PS_QUEUE_2, true); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding fanoutBinding1() { return BindingBuilder.bind(psQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2() { return BindingBuilder.bind(psQueue2()).to(fanoutExchange()); } //========================= public static final String ROUTING_QUEUE_1 = "routing_queue_1"; public static final String ROUTING_QUEUE_2 = "routing_queue_2"; public static final String DIRECT_EXCHANGE = "direct_exchange"; @Bean public Queue routingQueue1() { return new Queue(ROUTING_QUEUE_1, true); } @Bean public Queue routingQueue2() { return new Queue(ROUTING_QUEUE_2, true); } @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); } @Bean public Binding directBinding1() { return BindingBuilder.bind(routingQueue1()).to(directExchange()).with("user"); } @Bean public Binding directBinding2() { return BindingBuilder.bind(routingQueue2()).to(directExchange()).with("order"); } //============== Theme mode===================== public static final String TOPIC_QUEUE_1 = "topic_queue_1"; public static final String TOPIC_QUEUE_2 = "topic_queue_2"; public static final String TOPIC_EXCHANGE = "topic_exchange"; @Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE_1, true); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE_2, true); } @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.add"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#"); } }RabbitMQ has multiple working modes, so there are many configurations. Readers who want to know about relevant content can check out "Introduction to RabbitMQ Work Mode" or Baidu related information on their own.
Message Producer:
@Componentpublic class AmqpSender { @Autowired private AmqpTemplate amqpTemplate; /** * Simple mode send* * @param message */ public void simpleSend(String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.SIMPLE_QUEUE, message); } /** * Publish/subscribe mode send* * @param message */ public void psSend(String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.FANOUT_EXCHANGE, "", message); } /** * Send in routing mode* * @param message */ public void routingSend(String routingKey, String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.DIRECT_EXCHANGE, routingKey, message); } /** * Send in theme mode* * @param routingKey * @param message */ public void topicSend(String routingKey, String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.TOPIC_EXCHANGE, routingKey, message); }}Message Consumer:
@Componentpublic class AmqpReceiver { /** * Simple mode reception* * @param message */ @RabbitListener(queues = AmqpConfirguration.SIMPLE_QUEUE) public void simpleReceive(String message) { System.out.println("Receive message:" + message); } /** * Publish/Subscribe mode reception* * @param message */ @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_1) public void psReceive1(String message) { System.out.println(AmqpConfirguration.PS_QUEUE_1 + "Receive message:" + message); } @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_2) public void psReceive2(String message) { System.out.println(AmqpConfirguration.PS_QUEUE_2 + "Receive message:" + message); } /** * Routing mode reception* * @param message */ @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_1) public void routingReceive1(String message) { System.out.println(AmqpConfirguration.ROUTING_QUEUE_1 + "Receive message:" + message); } @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_2) public void routingReceive2(String message) { System.out.println(AmqpConfirguration.ROUTING_QUEUE_2 + "Receive message:" + message); } /** * Topic mode reception* * @param message */ @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_1) public void topicReceive1(String message) { System.out.println(AmqpConfirguration.TOPIC_QUEUE_1 + "Receive message:" + message); } @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_2) public void topicReceive2(String message) { System.out.println(AmqpConfirguration.TOPIC_QUEUE_2 + "Receive Message:" + message); }}Message consumers listen to messages using @RabbitListener annotation.
3.4 Test
@RunWith(SpringRunner.class)@SpringBootTestpublic class AmqpTest { @Autowired private AmqpSender sender; @Test public void testSimpleSend() { for (int i = 1; i < 6; i++) { this.sender.simpleSend("test simpleSend " + i); } } @Test public void testPsSend() { for (int i = 1; i < 6; i++) { this.sender.psSend("test psSend " + i); } } @Test public void testPsSend() { for (int i = 1; i < 6; i++) { this.sender.psSend("test psSend " + i); } } @Test public void testRoutingSend() { for (int i = 1; i < 6; i++) { this.sender.routingSend("order", "test routingSend " + i); } } @Test public void testTopicSend() { for (int i = 1; i < 6; i++) { this.sender.topicSend("user.add", "test topicSend " + i); } }}The test results are skipped. . .
Reminder 1: ACCESS_REFUSED Login was refused using authentication mechanism PLAIN
Solution:
1) Please make sure that the username and password are correct. It is important to note whether the values of the username and password contain spaces or tabs (the author tested it because the password had one more tab character, which caused the authentication failure).
2) If the test account is using guest, you need to modify the rabbitmq.conf file. Add the "loopback_users = none" configuration to the file.
Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it
Solution:
We can log in to the RabbitMQ management interface and manually add the corresponding queue in the Queue option.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.