1. Asynchronous communication
The RMI, Hessian and other technologies we have come into contact with before are all synchronous communication mechanisms. When the client calls a remote method, the client must wait until the remote method is completed before continuing to execute. The client will be blocked during this period (this causes a very bad user experience).
(Synchronous communication)
Synchronous communication is not the only way to interact between programs. In the asynchronous communication mechanism, the client does not need to wait for the service to process the message, can continue to execute, and finally receive and process the message.
(Asynchronous communication)
Advantages of asynchronous communication
No need to wait. The client only needs to send the message to the message broker, and can continue to perform other tasks without waiting, and is sure that the message will be delivered to the corresponding destination.
Message-oriented and decoupling. The client does not need to worry about the interface specifications of the remote service, it only needs to put the message into the message queue and get the results.
2. JMS
1. Introduction
Before JMS emerged, each message broker had a different implementation, which made the message code between different brokers difficult to be universal. JMS (Java Message Service) is a standard that defines a common API that uses message brokers. That is, all implementations that comply with specifications use a common interface, similar to JDBC providing a common interface for database operations.
Several important elements of JMS:
Destination: The channel to which the message is sent from the sender.
ConnectionFactory: Connection factory, used to create connected objects.
Connection: Connection interface, used to create session.
Session: Session interface, used to create the sender, recipient of the message, and the message object itself.
MessageConsumer: The consumer of the message.
MessageProducer: The producer of the message.
XXXMessage: Various types of message objects, including 5 types: ByteMessage, MapMessage, ObjectMessage, StreamMessage and TextMessage.
2. JMS message model
Different message systems have different message models. JMS provides two models: Queue (point-to-point) and Topic (publish/subscribe).
JMS Queue (Point-to-Point) Model
In a peer-to-peer model, the message producer produces a message and sends it to the queue, and the message consumer then takes it out of the queue and consumes the message, but cannot be consumed repeatedly.
As shown in the picture:
Sender 1, sender 2, and sender 3 each send a message to the server;
Messages 1, 2, and 3 will form a queue in order, and the messages in the queue do not know which recipient will be consumed by;
Receivers 1, 2, and 3 respectively fetch a message from the queue for consumption. Each time a message is fetched, the queue will delete the message, which ensures that the message will not be consumed repeatedly.
The JMS Queue model has also become a P2P (Point to Point) model.
JMS Topic (Publish/Subscribe) Model
The biggest difference between the JMS Topic model and the JMS Queue model lies in the part of the message receiving. The Topic model is similar to a WeChat official account. Recipients who subscribe to the official account can receive messages pushed by the official account.
As shown in the picture:
Publishers 1, 2, 3 publish 3 topics 1, 2, 3 respectively;
In this way, the user group that subscribes to topic 1: subscribers 1, 2, 3 can receive topic 1 messages; similarly, subscribers 4, 5, 6 can receive topic 2 messages, and subscribers 7, 8, 9 can receive topic 3 messages.
The JMS Topic model has also become the Pus/Sub model.
Comparison of each element in the two modes:
3. Traditional JMS programming model
Producer:
(1) Create a connection factory ConnectionFactory;
(2) Create a connection using the connection factory;
(3) Start the connection;
(4) Create a session;
(5) Create the destination for message sending;
(6) Create a producer;
(7) Create message type and message content;
(8) Send a message;
Consumer:
(1) Create a connection factory ConnectionFactory;
(2) Create a connection using the connection factory;
(3) Start the connection;
(4) Create a session;
(5) Create the destination for message sending;
(6) Create a consumer (7) Create a message type;
(8) Receive messages;
3. Introduction to ActiveMQ
ActiveMQ is the most popular, powerful open source message bus produced by Apache. ActiveMQ is a JMS Provider implementation that fully supports JMS1.1 and J2EE 1.4 specifications. Although the JMS specification has been introduced for a long time, JMS still plays a special position among today's J2EE applications.
ActiveMQ main features:
Writing clients in multiple languages and protocols. Languages: Java, C, C++, C#, Ruby, Perl, Python, PHP. Application protocol:
OpenWire, Stomp REST, WS Notification, XMPP, AMQP
Fully support for JMS1.1 and J2EE 1.4 specifications (persistence, XA messages, transactions)
For spring support, ActiveMQ can be easily embedded into the system using Spring, and also supports Spring 2.0 features. Passed the test of common J2EE servers (such as Geronimo, JBoss 4, GlassFish, WebLogic). Through the configuration of JCA 1.5 resource adaptors, ActiveMQ can be automatically deployed to any compatible J2EE 1.4 commercial server and supports multiple transmission protocols: in-VM, TCP, SSL, NIO, UDP, JGroups, JXTA
Supports high-speed message persistence through JDBC and journal, which ensures high-performance clusters, client-server, and point-to-point support for Ajax
Support integration with Axis to easily call the embedded JMS provider for testing
4. ActiveMQ actual combat
Let's take a look at how ActiveMQ implements a simple message queue.
Traditional JMS programming model
1. JMS Queue model code implementation:
Producer:
package com.wgs.mq.queue;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * Created by GenshenWang.nomico on 2017/10/19. */public class ActiveMQProducer {private static final String URL = "tcp://localhost:61616";private static final String QUEUE_NAME = "queue-name";public static void main(String[] args) throws JMSException {//1 Create a connection factory ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);//2 Create a connection using a connection factory Connection connection = connectionFactory.createConnection();//3 Start the connection connection.start();//4 Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5 Create a destination for sending a message Destination destination = session.createQueue(QUEUE_NAME);//6 Create a producer MessageProducer messageProducer = session.createProducer(destination);//7 Create message TextMessage textMessage = session.createTextMessage(); for (int i = 1; i <= 100; i++) {//8 Create message content textMessage.setText("Sender- 1 -Send message: " + i);//9 Send message messageProducer.send(textMessage);}System.out.println("Message sent successfully");session.close();connection.close();}}Conusmer:
package com.wgs.mq.queue;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * Created by GenshenWang.nomico on 2017/10/19. */public class ActiveMQConsumer {private static final String URL = "tcp://localhost:61616";private static final String QUEUE_NAME = "queue-name";public static void main(String[] args) throws JMSException {//1 Create a connection factory ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);//2 Create a connection using a connection factory Connection connection = connectionFactory.createConnection();//3 Start the connection connection.start();//4 Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5 Create a destination for sending a message Destination destination = session.createQueue(QUEUE_NAME);//6 Create a consumer MessageConsumer messageConsumer = session.createConsumer(destination); messageConsumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {//7 Create message TextMessage textMessage = (TextMessage)message;try {//7 Receive message System.out.println("Consumer- 1 - Receive message: [" + textMessage.getText() + "]");}catch (JMSException e) {e.printStackTrace();}}});}}2. JMS Topic model code implementation:
Producer:
package com.wgs.mq.topic;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * Publish subscription mode* Created by GenshenWang.nomico on 2017/10/19. */public class ActiveMQProducer {private static final String URL = "tcp://localhost:61616";private static final String TOPIC_NAME = "topic-name";public static void main(String[] args) throws JMSException {//1 Create a connection factory ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);//2 Create a connection using a connection factory Connection connection = connectionFactory.createConnection();//3 Start the connection connection.start();//4 Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5 Create a destination for sending messages with topic Destination destination = session.createTopic(TOPIC_NAME);//6 Create a producer MessageProducer messageProducer = session.createProducer(destination);//7 Create a message TextMessage textMessage = session.createTextMessage(); for (int i = 1; i <= 100; i++) {//8 Create message content textMessage.setText("Sender- 1 -Send message: " + i);//9 Send message messageProducer.send(textMessage);}System.out.println("Message sent successfully");session.close();connection.close();}}Consumer:
package com.wgs.mq.topic;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * Publish subscription mode* Created by GenshenWang.nomico on 2017/10/19. */public class ActiveMQConsumer {private static final String URL = "tcp://localhost:61616";private static final String TOPIC_NAME = "topic-name";public static void main(String[] args) throws JMSException {//1 Create a connection factory ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);//2 Create a connection using a connection factory Connection connection = connectionFactory.createConnection();//3 Start the connection.start();//4 Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5 Create the destination for sending the message Destination destination = session.createTopic(TOPIC_NAME);//6 Create a consumer MessageConsumer messageConsumer = session.createConsumer(destination); messageConsumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {//7 Create a message TextMessage textMessage = (TextMessage)message;try {//7 Receive message System.out.println("Consumer- 1 - Receive message: [" + textMessage.getText() + "]");}catch (JMSException e) {e.printStackTrace();}}});}}JMS templates using Spring
Although JMS provides a unified interface for all message brokers, like JDBC, it can appear complicated when handling connections, statements, result sets and exceptions. However, Spring provides us with JmsTemplate to eliminate redundant and duplicate JMS code.
Let’s take a look at how to use JmsTemplate to implement message queues.
1. JMS Queue model code implementation:
Configuration file:
producer.xml:
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:annotation-config/> <!-- ConnectionFactory provided by ActiveMQ--> <bean id="targetConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!-- Configure the JMS connection factory in Spring to connect to the ConnectionFactory provided by ActiveMQ--> <bean id="connectionFactory"> <property name="targetConnectionFactory" ref = "targetConnectionFactory"/> </bean> <!-- Configure JmsTemplate to send messages --> <bean id="jmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- Configure the name of the queue destination --> <bean id="queueDestination"> <constructor-arg value="queue-spring-name"/> </bean> <!-- Configure the name of the queue destination --> <bean id="topicDestination"> <constructor-arg value="topic-spring-name"/> </bean> <bean id="producerServiceImpl"/></beans>
consumer.xml:
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:annotation-config/> <!-- ConnectionFactory provided by ActiveMQ--> <bean id="targetConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!-- Configure the JMS connection factory in Spring to connect to the ConnectionFactory provided by ActiveMQ--> <bean id="connectionFactory"> <property name="targetConnectionFactory" ref = "targetConnectionFactory"/> </bean> <!-- Configure the name of the queue destination --> <bean id="queueDestination"> <constructor-arg value="queue-spring-name"/> </bean> <!-- Configure the message listener --> <bean id="consumerMessageListener"/> <!-- Configure the name of the queue destination --> <bean id="jmsContainer"> <property name="destination" ref="queueDestination"/> <property name="connectionFactory" ref="connectionFactory"/> <property name="messageListener" ref="consumerMessageListener"/> </bean> <!-- Configure the name of the queue destination--> <bean id="topicDestination"> <constructor-arg value="topic-spring-name"/> </bean></beans>
Producer:
(1) Write an interface first:
package com.wgs.jms.producer;/** * Created by GenshenWang.nomico on 2017/10/20. */public interface ActiveMQProducerService { void sendMessage(final String message);}(2) Interface implementation:
package com.wgs.jms.producer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import javax.annotation.Resource;import javax.jms.*;/** * Created by GenshenWang.nomico on 2017/10/20. */public class ActiveMQProducerServiceImpl implements ActiveMQProducerService {@Autowired JmsTemplate jmsTemplate;@Resource(name = "queueDestination") Destination destination;public void sendMessage(final String message) {jmsTemplate.send(destination, new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage textMessage = session.createTextMessage(message);return textMessage;}});System.out.println("Producer- 1 -Send message successfully:" + message);}}(3) Test:
package com.wgs.jms.producer;import org.springframework.context.support.ClassPathXmlApplicationContext;/** * Created by GenshenWang.nomico on 2017/10/20. */public class ActiveMQProducerMain {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");ActiveMQProducerService service = context.getBean(ActiveMQProducerService.class);for (int i = 0; i < 100; i++) {service.sendMessage("test" + i);}context.close();}} consumer:
(1) Create a message listener:
package com.wgs.jms.consumer;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;/** * Created by GenshenWang.nomico on 2017/10/20. */public class ConsumerMessageListener implements MessageListener {public void onMessage(Message message) {try {TextMessage textMessage = (TextMessage) message;System.out.println("Consumer-1 -Receive message:" + textMessage.getText());}catch (JMSException e) {e.printStackTrace();}}}(2) Test:
package com.wgs.jms.consumer;import org.springframework.context.support.ClassPathXmlApplicationContext;/** * Created by GenshenWang.nomico on 2017/10/20. */public class ActiveMQConsumerMain {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");}}2. JMS Topic model code implementation:
Just change the queueDestination that appears in the above code to topicDestination.
Summarize
The above is all the content of this article about the introduction of JMS and the actual code of ActiveMQ. I hope it will be helpful to everyone. Interested friends can continue to refer to other related topics on this site. If there are any shortcomings, please leave a message to point it out. Thank you friends for your support for this site!