1. Официальный веб-сайт Alibaba Cloud --- справочный документ
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.wwtiuh
Следите за шагами официального веб -сайта, чтобы создать тему, подать заявку на публикацию (продюсер) и подавать заявку на подписку (потребитель)
2. Код
1. Конфигурация:
открытый класс MQConfig {/** * Пожалуйста, замените следующий XXX, прежде чем начать тест */public static final String public_topic = "test"; // public Static Final String public_producer_id = "pid_scheduler"; public Static Final String public_consumer_id = "cid_service"; Public Static Final String Access_key = "123"; Public Static Final String Secret_key = "123"; Public Static Final String Tag = ""; Публичная статическая конечная строка Thread_num = "25"; // Количество потребительских потоков/*** onsaddr, пожалуйста, настройте в соответствии с различными регионами* Общедоступный сетевой тест: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet* public cloud Производство: Производство: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * Hangzhou Financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client internal * shenzhen financial: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */public static final String onsaddr = "http://onsaddr-internal.aliyun.com:8080/Rocketmq/nsaddr4client internal";Onsaddr Alibaba Cloud использует производство общественного облака, а тесты используют публичную сеть
Различные сервисы могут устанавливать разные теги, но если объем сообщения большой, рекомендуется создать новую тему.
2. Производитель
Метод 1:
Файл конфигурации: Производитель.xml
<? xml version = "1.0" Encoding = "UTF-8"?> <! Doctype Beans public "-// Spring // dtd bean // en" "http://www.springframework.org/dtd/spring-beans.dtd"> <beans> <bean id = "init method intemethod" inteplod "intemetod" inteplowd-metd "inteplowd-motod" inteplowod "<beans> <bean id = name = "Свойства"> <Ap> <intpirt key = "production" value = "" /> <!-PID, пожалуйста, замените-> <intpirt key = "accessKey" value = "" /> <!-access_key, пожалуйста, замените-> <intpirt key = "secretkey" value = " /> <!-secret_key, пожалуйста, замените-> <!-PropertyKeyConSt.SADDR, пожалуйста, Publication vities at vectients vitietions. http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet Public Cloud Production: http://onsaddr-internal.aliyun.com:8080/Rocketmq/nsaddr4client-internal financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal Shenzhen Финансовый облако: http://mq4finance-sz.addr.aliyun.com:8080/Rocketmq/nsaddr4client-internal-> <intysaddr "ons-ysaddr" onsdaddr "ons-ysadddr" ons-ysdddr "ons-insaddr" ons-ysdddr "ons-ysdddr" ons-ysddddr " value = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/> </map> </property> </bean> </beans>
Метод 1 запуска 1, установленного в глобальном настройке класса:
// Инициализировать производитель private ApplicationContext CTX; Частный продюсер -производитель; @Value ("$ {uffecterconfig.enabled}") // Переключатель, элемент конфигурации пружины, true включен, false Off Private Boolean Producterconfigenabled; @Postconstruct public void init () {if (true == ProducterConfigenabled) {ctx = new classpathxmlapplicationContext ("Производитель.xml"); Производитель = (продюсер) ctx.getbean («Производитель»); }}PS: Я недавно обнаружил яму. Если производитель запускается в приведенном выше методе, как только он начнется больше, он вызовет FullGC. Поэтому вы можете перейти на следующий метод аннотации, чтобы запустить вручную и выключить, где вы его используете.
Метод 2: Настройка класса (не требуется XML)
@ConfigurationPublic Class Productsconconfig {@Value ("$ {openservices.ons.producerbean.producerid}") Private String Promegrid; @Value ("$ {openservices.ons.producerbean.accesskey}") private string accesskey; @Value ("$ {openservices.ons.producerbean.secretkey}") Private String SecretKey; Частный продюсер -продюсер; @Value ("$ {openservices.ons.producerbean.onsaddr}") частная строка onsaddr; @Bean Public Produceerbean OneProducer () {Производитель продюсера = New ProduceerBean (); Свойства свойства = новые свойства (); Properties.SetProperty (PropertyKeyConst.Producerid, производительность); Properties.SetProperty (PropertyKeyConst.Accesskey, AccessKey); Properties.SetProperty (PropertyKeyConst.SecretKey, SecretKey); Properties.SetProperty (PropertyKeyConst.OnsAddr, OnsAddr); Производитель. SetProperties (свойства); возвращаемый продюсер; }}PS: После этого Double 11 было обнаружено, что два вышеупомянутые методы не очень подходят для больших объемов данных и многопоточных ситуаций, и производительность очень плохая, поэтому рекомендуется использовать 3.
Метод 3: (XML не требуется)
@ComponentPublic Class Producebeansingleton {@Value ("$ {openservices.ons.producerbean.producerid}") private String Promegled; @Value ("$ {openservices.ons.producerbean.accesskey}") private string accesskey; @Value ("$ {openservices.ons.producerbean.secretkey}") Private String SecretKey; @Value ("$ {openservices.ons.producerbean.onsaddr}") частная строка onsaddr; частный статический производитель; Private Static Class Singletonholder {Private Static Final Producebeansingleton Extance = New Productbeansingleton (); } private Producebeansingleton () {} public Static Final Producebeansingleton getInstance () {return singletonholder.instance; } @Postconstruct public void init () {// Свойства инициализации экземпляра производителя свойства = новые свойства (); // Свойства идентификатора производителя.setProperty (PropertyKeyConst.Producerid, производительность); // AccessKey Alibaba Cloud Authentication, Create Properties.SetProperty (PropertyKeyConst.Accesskey, AccessKey); // SecretKey Alibaba Cloud Authentication, Create Properties.SetProperty (PropertyKeyConst.SecretKey, SecretKey); // SecretKey Alibaba Cloud Authentication, Create Properties.SetProperty (PropertyKeyConst.SecretKey, SecretKey); // Установить время отправки, единица Millisecond Properties.SetProperty (PropertyKeyConst.sendmsgtimeOutmillis, "3000"); // Установите имя домена доступа TCP (см. Среду производства общественного облака в качестве примера здесь) Properties.SetProperty (PropertyKeyConst.onsaddr, OnsAddr); Производитель = onsfactory.createProducer (свойства); // Перед отправкой сообщения вы должны вызвать метод начала, чтобы запустить производителя, и вам нужно только позвонить в продюсер.start (); } public Producer getProducer () {return Producer; }}Конфигурация пружины
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.mysql5dialectconsumererconfig.enabled = trueproducerconfig.enabled = true #method 1: Depareming.enabled = false #Method 2, 3: RocketMq /u516c/u7f51/u914d/u7f6eopenservices.ons.producerbean.producerid = pidopenservices.ons.producerbean.accesskey = openservices.ons.secerbean. openservices.ons.producerbean.onsaddr = public network, Hangzhou Public Cloud Production
Метод 1 Доставьте код сообщения:
try {string jsonc = jsonutils.tojson (elevenmessage); Сообщение сообщения = новое сообщение (mqconfig.topic, mqconfig.tag, jsonc.getbytes ()); SendResult SendResult = Produce.Send (сообщение); if (sendresult! = null) {logger.info (".. Отправить успех сообщения MQ!";} else {logger.warn (". SendResult is null ......");}} Catch (Exception e) {logger.warn ("DayelevenallPreserv"); Thread.sepe (1000);Метод 2 Код сообщения доставки: (можно запускать/закрывать каждые 1000 раз)
produceerbean.start (); try {string jsonc = jsonutils.tojson (elevenmessage); Сообщение сообщения = новое сообщение (mqconfig.topic, mqconfig.tag, jsonc.getbytes ()); SendResult SendResult = Produce.Send (сообщение); if (sendresult! = null) {logger.info (".. Отправить успех сообщения MQ!";} else {logger.warn (". SendResult is null ......");}} Catch (Exception e) {logger.warn ("dayelevenallPreserv"); Thread.sepe (1000);//если есть исключение, Sleep 1 seThineBINER);Метод 3: Доставьте сообщение
try {string jsonc = jsonutils.tojson (elevenmessage); Сообщение сообщения = новое сообщение (mqconfig.topic, mqconfig.tag, jsonc.getbytes ()); Производитель продюсер = Производитель Beaningleton.getInstance (). GetProducer (); SendResult SendResult = Produce.Send (сообщение); if (sendresult! = null) {logger.info ("doubleelevenmidservice.send Mq Message Success! Тема:";} else {logger.warn ("dowelevenmidservice.sendresult is null ......");}} Catch (Exception e) {logger.error ("Downelevinevice retraderse. "+e.getMessage (), e); thread.sleep (1000); // Если есть исключение, сон на 1 секунду}Код, который отправляет сообщение, должен поймать исключение, в противном случае он будет отправляться неоднократно.
Тема здесь создана самим собой. ElevenMessage - это контент, который будет отправлен. Я тот объект, который я создал сам.
3. Потребители
Настройте класс запуска:
@Configuration@condityalonproperty (value = "consumerconfig.enabled", hailvalue = "true", matchifmissing = true) public cunesterconfig {private logger = loggerfactory.getLogger (loggerAppenderType.smsdist.name ()); @Bean Public Consumer ConsumerFactory () {// Различные потребители не могут переименовать свойства здесь ConsumerProperties = new Properties (); ConsumerProperties.SetProperty (PropertyKeyConst.Consumerid, MQConfig.consumer_id); ConsumerProperties.SetProperty (PropertyKeyConst.Accesskey, mqconfig.access_key); ConsumerProperties.SetProperty (PropertyKeyConst.SecretKey, MQConfig.Secret_key); //consumerproperties.setproperty(propertykeyconst.consumethreadnums, mqconfig.thread_num); ConsumerProperties.SetProperty (PropertyKeyConst.OnsAddr, MQConfig.onsaddr); Потребительский потребитель = onsfactory.createConsumer (ConmerceProperties); Consumer.subscribe (mqconfig.topic, mqconfig.tag, new Doolelevenmessagelistener ()); // новый соответствующий слушатель consumer.start (); logger.info ("ConsumperConfig Start Success."); вернуть потребителя; }}Вам нужно выбрать правильный CID и ONSADDR. Вы можете настроить его здесь, используя свой собственный, количество потребительских потоков и т. Д.
Создайте класс прослушивателя сообщений и потребляйте сообщения:
@Componentpublic class messagelistener реализует Messagelistener {private logger logger = loggerfactory.getLogger ("напомнить"); Защищенные статические ElevenReposit ElevenReposit; @Resource public void setelevenreposit (elevenreposit elevenreposit) {messagelistener .elevenreposit = elevenreposit; } @Override Public Action Потребление действия (сообщение сообщения, ConsumerContext ConsumeContext) {if (message.getTopic (). Equals ("собственная тема")) {// избегание употребления других сообщений. String res = new String (body); // res - это содержимое сообщения, отправленное производителем // бизнес -кодом} else {logger.warn ("!"); }} catch (Exception e) {logger.error ("messagelistener.consume error:" + e.getmessage (), e); } logger.info ("messagelistener.receive message"); // Если вы хотите проверить функцию репостирования сообщений, вы можете заменить Action.commitmessage на action.reconsumelater return Action.commitmessage; } else {logger.warn (); return action.reconsumelater; }}Обратите внимание, что, поскольку потребители многопоточные, объект должен быть введен со статичным+установленным для повышения уровня объекта к процессу, чтобы можно было использовать несколько потоков, но методы и переменные родительского класса не могут быть вызваны.
Статус потребителя может проверить, успешно ли потребитель подключен, является ли потребление отложено, скорость потребления и т. Д.
Сброс сайта потребления может очистить все сообщения
3. Что следует отметить
1. Максимальное отправленное тело сообщения составляет 256 КБ
2. Сообщение существует в течение 3 дней
3. Количество потоков по умолчанию на стороне потребителя составляет 20
4. Если Java висит трубку или процессор занимает чрезвычайно высокую сумму во время пробега, вы можете отправить поток за 1 с каждыми 1000 сообщений при его отправке.
5. При локальном тестировании или запусках замените ONSADDR публичной сетью, в противном случае ошибка не будет начата.
Выше всего содержание этой статьи. Я надеюсь, что это будет полезно для каждого обучения, и я надеюсь, что все будут поддерживать Wulin.com больше.