1. Preface
Recently, the company has the need to use Alibaba Cloud message queue. In order to make it more convenient to use, I have spent a few days encapsulating the message queue into API calling method to facilitate the call of the internal system. It has been completed now. Here we record the process and the relevant technologies used, and share with you.
Alibaba Cloud now provides two message services: mns service and ons service. I think mns is a simplified version of ons, and mns message consumption requires custom polling strategies. In contrast, ons' publishing and subscription mode functions are more powerful (for example, compared with mns, ons provides message tracking, logging, monitoring and other functions), and its API is more convenient to use. It has also been heard that Alibaba will no longer develop mns in the future, but only maintain it. Ons service will gradually replace mns service and become the main product of Alibaba's message service. Therefore, if there is a need to use message queues, it is recommended not to use mns again. Using ons is the best choice.
Techniques involved: Spring, reflection, dynamic proxy, Jackson serialization and deserialization
Before reading the following article, you need to read the above documentation to understand the relevant concepts (Topic, Consumer, Producer, Tag, etc.) and the simple sending and receiving code implementations provided in the documentation.
This blog post is only for friends who have a knowledge base on message queues. I am naturally very happy to help everyone. Don’t scold anyone who doesn’t understand it, as it means that your path is wrong.
2. Design plan
1. Message sending
In a simple CSS architecture, assuming that the server will listen to the message sent by a Topic Producer, it should first provide a client API. The client only needs to simply call the API and can produce messages through the producer.
2. Message reception
Since the API is formulated by the server, the server of course also knows how to consume these messages.
In this process, the server actually plays the role of consumers, and the client actually plays the role of producers, but the rules for producers to produce messages are formulated by consumers to meet consumer consumption needs.
3. The ultimate goal
We want to create a separate jar package named queue-core to provide specific implementations of dependencies and publish subscriptions for producers and consumers.
3. Message sending
1. Consumers provide interfaces
@Topic(name="kdyzm",producerId="kdyzm_producer")public interface UserQueueResource { @Tag("test1") public void handleUserInfo(@Body @Key("userInfoHandler") UserModel user); @Tag("test2") public void handleUserInfo1(@Body @Key("userInfoHandler1") UserModel user);}Since Topic and producer are in N:1 relationship, producerId is directly used as a property of Topic; Tag is a very critical filtering condition, and consumers use it to classify messages to perform different business processing, so Tag is used as routing condition here.
2. Producer sends messages using the API provided by the consumer
Since consumers only provide interfaces for producers to use, there is no way to use interfaces directly because there is no way to instantiate them. Here we use dynamic proxy to generate objects. In the API provided by consumers, add the following config to facilitate producers to directly import config and use it. Here we use spring config based on java. Please know.
@Configurationpublic class QueueConfig { @Autowired @Bean public UserQueueResource userQueueResource() { return QueueResourceFactory.createProxyQueueResource(UserQueueResource.class); }}3. Encapsulation of queue-core for the producer's message sending
All the annotations in 1 above (Topic, Tag, Body, Key) and the QueueResourceFactory classes used in 2 must be defined in queue-core. The definition of the annotation only defines the rules. The real implementation is actually in QueueResourceFactory.
import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.Producer;import com.aliyun.openservices.ons.api.SendResult;import com.wy.queue.core.api.MQConnection;import com.wy.queue.core.utils.JacksonSerializer;import com.wy.queue.core.utils.MQUtils;import com.wy.queue.core.utils.QueueCoreSpringUtils;public class QueueResourceFactory implements InvocationHandler { private static final Logger logger=LoggerFactory.getLogger(QueueResourceFactory.class); private String topicName; private String producerId; private JacksonSerializer serializer=new JacksonSerializer(); private static final String PREFIX="PID_"; public QueueResourceFactory(String topicName,String producerId) { this.topicName = topicName; this.producerId=producerId; } public static <T> T createProxyQueueResource(Class<T> clazz) { String topicName = MQUtils.getTopicName(clazz); String producerId = MQUtils.getProducerId(clazz); T target = (T) Proxy.newProxyInstance(QueueResourceFactory.class.getClassLoader(), new Class<?>[] { clazz }, new QueueResourceFactory(topicName,producerId)); return target; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if(args.length == 0 || args.length>1){ throw new RuntimeException("only accept one param at queueResource interface."); } String tagName=MQUtils.getTagName(method); ProducerFactory producerFactory = QueueCoreSpringUtils.getBean(ProducerFactory.class); MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class); Producer producer = producerFactory.createProducer(PREFIX+connectionInfo.getPrefix()+"_"+producerId); //Send message Message msg = new Message( // // The Topic created in the console, that is, the Topic name to which the message belongs. connectionInfo.getPrefix()+"_"+topicName, // Message Tag, // It can be understood as a tag in Gmail, and the message is reclassified to facilitate Consumer to specify filtering conditions to filter tagName on the MQ server, // Message Body // Any binary form of data, MQ does not interfere with any, // Producer and Consumer are required Negotiate a consistent serialization and deserialization method serializer.serialize(args[0]).getBytes()); SendResult sendResult = producer.send(msg); logger.info("Send Message success. Message ID is: " + sendResult.getMessageId()); return null; } }Here we have specially posted the custom package and the package names used by third parties to facilitate the distinction.
What exactly are done here?
The process of sending a message is to create a proxy object on the dynamic proxy. The object will be intercepted when calling the method. First, parse all annotations, such as topicName, producerId, tag and other key information from the annotations, and then call Alibaba SDK to send the message. The process is very simple, but note that when sending messages here, it is divided into environments. Generally speaking, the enterprise now distinguishes three environments: QA, staging, and product. Among them, QA and staging are test environments. For message queues, there are also three rings. In the environment, however, QA and staging environments often use the same Alibaba account to reduce costs, so the created topic and productId will be placed in the same area. In this way, TopicName with the same name is not allowed to exist, so environment prefix is added to distinguish them, such as QA_TopicName, PID_Staging_ProducerId, etc.; in addition, queue-core provides an MQConnection interface to obtain configuration information, and producer services only need to implement this interface.
4. Producer sends messages
@Autowired private UserQueueResource userQueueResource; @Override public void sendMessage() { UserModel userModel=new UserModel(); userModel.setName("kdyzm"); userModel.setAge(25); userQueueResource.handleUserInfo(userModel); }Just a few lines of code are needed to send the message to the specified Topic, which is much thinner than the native sending code.
4. News consumption
Compared with message sending, message consumption is more complicated.
1. Message consumption design
Since Topic and Consumer are N:N relationship, ConsumerId is placed on the consumer's specific implementation method
@Controller@QueueResourcepublic class UserQueueResourceImpl implements UserQueueResource { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override @ConsumerAnnotation("kdyzm_consumer") public void handleUserInfo(UserModel user) { logger.info("Message 1 received: {}", new Gson().toJson(user)); } @Override @ConsumerAnnotation("kdyzm_consumer1") public void handleUserInfo1(UserModel user) { logger.info("Message 2 received: {}", new Gson().toJson(user)); }}Here are two new annotations @QueueResource and @ConsumerAnnotation. These two annotations will be discussed in the future. Someone may ask me why I should use the name ConsumerAnnotation instead of the name Consumer, because the name Consumer conflicts with the name in the SDK provided by Aliyun. . . .
Here, consumers provide the API interface to producers to facilitate the producers to send messages, and consumers implement the interface to consume messages sent by producers. How to implement the API interface is to implement monitoring, which is a relatively critical logic.
2.queue-core implements the core logic of message queue listening
Step 1: Use the listening method of the spring container to obtain all beans with QueueResource annotations
Step 2: Distribute the processing beans
How to deal with these beans? Each bean is actually an object. With an object, such as the UserQueueResourceImpl object in the above example, we can get the interface bytecode object implemented by the object, and then get the annotations on the interface UserQueueRerousce and the annotations on the methods and methods. Of course, the annotations on the implementation method of the UserQueueResourceImpl can also be obtained. Here I will use consumerId as the key, and the remaining relevant information is encapsulated as Value and cached into a Map object. The core code is as follows:
Class<?> clazz = resourceImpl.getClass(); Class<?> clazzIf = clazz.getInterfaces()[0]; Method[] methods = clazz.getMethods(); String topicName = MQUtils.getTopicName(clazzIf); for (Method m : methods) { ConsumerAnnotation consumerAnno = m.getAnnotation(ConsumerAnnotation.class); if (null == consumerAnno) {// logger.error("method={} need Consumer annotation.", m.getName()); continue; } String consumerId = consumerAnno.value(); if (StringUtils.isEmpty(consuerId)) { logger.error("method={} ConsumerId can't be null", m.getName()); continue; } Class<?>[] parameterTypes = m.getParameterTypes(); Method resourceIfMethod = null; try { resourceIfMethod = clazzIf.getMethod(m.getName(), parameterTypes); } catch (NoSuchMethodException | SecurityException e) { logger.error("can't find method={} at super interface={} .", m.getName(), clazzIf.getCanonicalName(), e); continue; } String tagName = MQUtils.getTagName(resourceIfMethod); consumersMap.put(consuerId, new MethodInfo(topicName, tagName, m)); }Step 3: Consumption actions through reflection
First, determine the timing of the reflection action execution, that is, listen to new messages
Secondly, how to perform reflection actions? I won’t go into details. Children’s shoes with reflection-related foundations know how to make them. The core code is as follows:
MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class); String topicPrefix=connectionInfo.getPrefix()+"_"; String consumerIdPrefix=PREFIX+connectionInfo.getPrefix()+"_"; for(String consumerId:consumersMap.keySet()){ MethodInfo methodInfo=consumersMap.get(consumerId); Properties connectionProperties=convertToProperties(connectionInfo); // Consumer ID you created in the console connectionProperties.put(PropertyKeyConst.ConsumerId, consumerIdPrefix+consumerId); Consumer consumer = ONSFactory.createConsumer(connectionProperties); consumer.subscribe(topicPrefix+methodInfo.getTopicName(), methodInfo.getTagName(), new MessageListener() { //Subscribe to multiple Tag public Action consumption(Message message, ConsumerContext context) { try { String messageBody=new String(message.getBody(),"UTF-8"); logger.info("receive message from topic={},tag={},consumerId={},message={}",topicPrefix+methodInfo.getTopicName(), methodInfo.getTagName(),consumerIdPrefix+consumerId,messageBody); Method method=methodInfo.getMethod(); Class<?> parameterType = method.getParameterTypes()[0]; Object arg = jacksonSerializer.deserialize(messageBody, parameterType); Object[] args={arg}; method.invoke(resourceImpl, args); } catch (Exception e) { logger.error("",e); } return Action.CommitMessage; } }); consumer.start(); logger.info("consumer={} has started.",consumerIdPrefix+consumerId); }5. See the git link below for the complete code
https://github.com/kdyzm/queue-core.git
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.