1. Alibaba Cloud official website---Help document
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh
Follow the steps of the official website to create a Topic, apply for publishing (producer), and apply for subscription (consumer)
2. Code
1. Configuration:
public class MqConfig { /** * Please replace the following XXX before starting the test */ public static final String PUBLIC_TOPIC = "test";//Public static final String PUBLIC_PRODUCER_ID = "PID_SCHEDULER"; public static final String PUBLIC_CONSUMER_ID = "CID_SERVICE"; public static final String ACCESS_KEY = "123"; public static final String SECRET_KEY = "123"; public static final String TAG = ""; public static final String THREAD_NUM = "25";//Number of consumer threads/** * ONSADDR Please configure according to different regions* Public network test: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet * Public cloud production: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * Hangzhou Financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * Shenzhen Financial Cloud: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */ public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal";}ONSADDR Alibaba Cloud uses public cloud production, and tests use public network
Different services can set different tags, but if the message volume is large, it is recommended to create a new TOPIC.
2. Producer
Method 1:
Configuration file: producer.xml
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd"><beans> <bean id="producer" init-method="start" destroy-method="shutdown"> <property name="properties"> <map> <entry key="ProducerId" value="" /> <!-- PID, please replace --> <entry key="AccessKey" value="" /> <!-- ACCESS_KEY, please replace --> <entry key="SecretKey" value="" /> <!-- SECRET_KEY, please replace --> <!--PropertyKeyConst.ONSAddr Please configure the public network test according to different regions: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet Public cloud production: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal Hangzhou Financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal Shenzhen Financial Cloud: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal --> <entry key="ONSAddr" value="http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/> </map> </property> </bean></beans>
Startup method 1, set in the global setting of the class:
//Initialize the producer private ApplicationContext ctx; private ProducerBean producer; @Value("${producerConfig.enabled}")//Switch, spring configuration item, true is on, false turn off private boolean producerConfigEnabled; @PostConstruct public void init(){ if (true == producerConfigEnabled) { ctx = new ClassPathXmlApplicationContext("producer.xml"); producer = (ProducerBean) ctx.getBean("producer"); } }PS: I recently discovered a pit. If the producer is started in the above method, once it starts more, it will cause fullGC. Therefore, you can change to the following annotation method to start manually and shutdown where you use it.
Method 2: Configure class (no xml required)
@Configurationpublic class ProducerBeanConfig { @Value("${openservices.ons.producerBean.producerId}") private String producerId; @Value("${openservices.ons.producerBean.accessKey}") private String accessKey; @Value("${openservices.ons.producerBean.secretKey}") private String secretKey; private ProducerBean producerBean; @Value("${openservices.ons.producerBean.ONSAddr}") private String ONSAddr; @Bean public ProducerBean oneProducer() { ProducerBean producerBean = new ProducerBean(); Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.ProducerId, producerId); properties.setProperty(PropertyKeyConst.AccessKey, accessKey); properties.setProperty(PropertyKeyConst.SecretKey, secretKey); properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr); producerBean.setProperties(properties); return producerBean; }}PS: After this Double 11, it was found that the above two methods are not very suitable for large data volume and multi-threading situations, and the performance is very poor, so it is recommended to use 3.
Method 3: (No XML is required)
@Componentpublic class ProducerBeanSingleTon { @Value("${openservices.ons.producerBean.producerId}") private String producerId; @Value("${openservices.ons.producerBean.accessKey}") private String accessKey; @Value("${openservices.ons.producerBean.secretKey}") private String secretKey; @Value("${openservices.ons.producerBean.ONSAddr}") private String ONSAddr; private static Producer producer; private static class SingletonHolder { private static final ProducerBeanSingleTon INSTANCE = new ProducerBeanSingleTon(); } private ProducerBeanSingleTon (){} public static final ProducerBeanSingleTon getInstance() { return SingletonHolder.INSTANCE; } @PostConstruct public void init(){ // producer instance configuration initialization Properties properties = new Properties(); //Producer ID properties.setProperty(PropertyKeyConst.ProducerId, producerId); // AccessKey Alibaba Cloud authentication, create properties.setProperty(PropertyKeyConst.AccessKey, accessKey); // SecretKey Alibaba Cloud authentication, create properties.setProperty(PropertyKeyConst.SecretKey, secretKey); // SecretKey Alibaba Cloud authentication, create properties.setProperty(PropertyKeyConst.SecretKey, secretKey); //Set the send timeout time, unit millisecond properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // Set the TCP access domain name (see the public cloud production environment as an example here) properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr); producer = ONSFactory.createProducer(properties); // Before sending a message, you must call the start method to start the Producer, and you only need to call it once to producer.start(); } public Producer getProducer(){ return producer; }}Spring configuration
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5DialectconsumerConfig.enabled = trueproducerConfig.enabled = true #Method 1: scheduleing.enabled = false#Method 2, 3: rocketMQ /u516C/u7F51/u914D/u7F6Eopenservices.ons.producerBean.producerId = pidopenservices.ons.producerBean.accessKey = openservices.ons.producerBean.secretKey = openservices.ons.producerBean.secretKey = openservices.ons.producerBean.ONSAddr = Public network, Hangzhou public cloud production
Method 1 Deliver the message code:
try { String jsonC = JsonUtils.toJson(elevenMessage); Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes()); SendResult sendResult = producer.send(message); if (sendResult != null) { logger.info(".Send mq message success!"; } else { logger.warn(".sendResult is null......"); } } catch (Exception e) { logger.warn("DoubleElevenAllPreService"); Thread.sleep(1000);//If there is an exception, sleep for 1 second}Method 2 Delivery message code: (can be started/closed every 1000 times)
producerBean.start();try { String jsonC = JsonUtils.toJson(elevenMessage); Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes()); SendResult sendResult = producer.send(message); if (sendResult != null) { logger.info(".Send mq message success!"; } else { logger.warn(".sendResult is null......"); } } catch (Exception e) { logger.warn("DoubleElevenAllPreService"); Thread.sleep(1000);//If there is an exception, sleep for 1 second} producerBean.shutdown();Method 3: Deliver the message
try { String jsonC = JsonUtils.toJson(elevenMessage); Message message = new Message(MqConfig.TOPIC, MqConfig.TAG, jsonC.getBytes()); Producer producer = ProducerBeanSingleTon.getInstance().getProducer(); SendResult sendResult = producer.send(message); if (sendResult != null) { logger.info("DoubleElevenMidService.Send mq message success! Topic is:"; } else { logger.warn("DoubleElevenMidService.sendResult is null......"); } } catch (Exception e) { logger.error("DoubleElevenMidService Thread.sleep 1 s___error is "+e.getMessage(), e); Thread.sleep(1000);//If there is an exception, sleep for 1 second}The code that sends the message must catch the exception, otherwise it will be sent repeatedly.
The TOPIC here is created by yourself. ElevenMessage is the content to be sent. I am the object I created by myself.
3. Consumers
Configure the startup class:
@Configuration@ConditionalOnProperty(value = "consumerConfig.enabled", havingValue = "true", matchIfMissing = true)public class ConsumerConfig { private Logger logger = LoggerFactory.getLogger(LoggerAppenderType.smsdist.name()); @Bean public Consumer consumerFactory(){//Different consumers cannot rename Properties here consumerProperties = new Properties(); consumerProperties.setProperty(PropertyKeyConst.ConsumerId, MqConfig.CONSUMER_ID); consumerProperties.setProperty(PropertyKeyConst.AccessKey, MqConfig.ACCESS_KEY); consumerProperties.setProperty(PropertyKeyConst.SecretKey, MqConfig.SECRET_KEY); //consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums,MqConfig.THREAD_NUM); consumerProperties.setProperty(PropertyKeyConst.ONSAddr, MqConfig.ONSADDR); Consumer consumer = ONSFactory.createConsumer(consumerProperties); consumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new DoubleElevenMessageListener()); //new corresponding listener consumer.start(); logger.info("ConsumerConfig start success."); return consumer; }}You need to choose the right CID and ONSADDR. You can configure it here by using your own, consumer thread count, etc.
Create a message listener class and consume messages:
@Componentpublic class MessageListener implements MessageListener { private Logger logger = LoggerFactory.getLogger("remind"); protected static ElevenReposit elevenReposit; @Resource public void setElevenReposit(ElevenReposit elevenReposit){ MessageListener .elevenReposit=elevenReposit; } @Override public Action consumption(Message message, ConsumerContext consumeContext) { if(message.getTopic().equals("Own TOPIC")){//Avoid consuming other messages json conversion errors try { byte[] body = message.getBody(); String res = new String(body); //res is the message content sent by the producer//Business code}else{ logger.warn("!"); } } catch (Exception e) { logger.error("MessageListener.consume error:" + e.getMessage(), e); } logger.info("MessageListener.Receive message"); //If you want to test the function of message reposting, you can replace Action.CommitMessage with Action.ReconsumeLater return Action.CommitMessage; }else{ logger.warn(); return Action.ReconsumeLater; } }Note that since consumers are multi-threaded, the object needs to be injected with static+set to raise the object level to the process, so that multiple threads can be shared, but the methods and variables of the parent class cannot be called.
Consumer status can check whether the consumer is connected successfully, whether the consumption is delayed, consumption speed, etc.
Resetting the consumption site can clear all messages
3. Things to note
1. The maximum message body sent is 256KB
2. The message exists for up to 3 days
3. The default number of threads on the consumer side is 20
4. If Java hangs up or CPU occupies an extremely high amount during the run, you can send the thread for 1s of every 1,000 messages when sending it.
5. When local testing or startup, replace ONSADDR with a public network, otherwise the error will not be started.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.