1. Sitio web oficial de Alibaba Cloud --- Documento de ayuda
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.wwtiuh
Siga los pasos del sitio web oficial para crear un tema, solicite la publicación (productor) y solicite suscripción (consumidor)
2. Código
1. Configuración:
clase pública mqconfig {/** * Reemplace el siguiente xxx antes de comenzar la prueba de prueba */public static final public_topic = "test"; // public static final String public_producer_id = "pid_scheduler"; Public static final String public_consumer_id = "cid_service"; Public static final String access_key = "123"; public static final String Secret_key = "123"; public static final String tag = ""; public static final String string_num = "25"; // Número de hilos de consumo/*** onsaddr Por favor, configure de acuerdo con diferentes regiones* Prueba de red pública: http://onsaddrinternet.aliyun.com/rocketmq/nsaddr4client-internet* Public Cloud Production: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * Hangzhou Financial Cloud: http://jbponsaddrinternal.aliyun.com:8080/rocketmq/nsaddr4client-internal * shenzhen nube: shenzhen nube: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */public static final String onsaddr = "http://onsaddrinternal.aliyun.com:8080/rocketmq/nsaddr4client";}Onsaddr Alibaba Cloud utiliza la producción de nube pública, y las pruebas usan la red pública
Diferentes servicios pueden establecer diferentes etiquetas, pero si el volumen del mensaje es grande, se recomienda crear un nuevo tema.
2. Productor
Método 1:
Archivo de configuración: productor.xml
<? xml versión = "1.0" encoding = "utf-8"?> <! Doctype Beans public "-// spring // dtd bean // en" "http://www.springframework.org/dtd/spring-beanss.dtdd"> <beachs> <bean id = "productor" name = "Propiedades"> <Map> <Entrada Key = "ProducerId" Value = "" /> <!-Pid, por favor reemplace-> <Entrada Key = "AccessKey" Value = "" /> <!-Access_Key, por favor reemplace-> <Ingreso Key = "Secret Key" Value = " /> <!-Secret_Key, reemplace-> <!-PropertyConst.onsaddr Por favor, configure la prueba pública de la red de acuerdo con los diferentes regiones con los diferentes regiones. http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet nube pública producción: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal hangzhou financiera nube: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal shenzhen financiera nube: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsadr4client-internal-> <ing entring key = "onsaddr" value = "http://onsaddrinternal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/> </map> </property> </bean> </ Beans>
Método de inicio 1, establecido en la configuración global de la clase:
// Inicializar la aplicación privada del productor Context CTX; Productor de productores privados; @Value ("$ {producerConfig.enabled}") // switch, elemento de configuración de resorte, verdadero está encendido, falso apagar el booleano privado booleanConfigenabled; @PostConstruct public void init () {if (true == ProducerConFigeNable) {ctx = new ClassPathXMLAplaPlicationContext ("Producer.xml"); productor = (producterbean) ctx.getBean ("productor"); }}PD: Recientemente descubrí un pozo. Si el productor se inicia en el método anterior, una vez que comience más, causará FullGC. Por lo tanto, puede cambiar al siguiente método de anotación para comenzar manualmente y apagar dónde lo usa.
Método 2: Configurar clase (no se requiere XML)
@ConfigurationPublic de clase ProducerBeanConfig {@Value ("$ {OpenServices.ons.producerBean.ProducerId}") Private String ProducerId; @Value ("$ {OpenServices.ons.producerBean.accesskey}") privado String AccessKey; @Value ("$ {OpenServices.ons.producerBean.secretkey}") String private SecretKey; Productor de productores privados productores; @Value ("$ {OpenServices.ons.producerBean.onsaddr}") String private onSaddr; @Bean Public ProducerBean OneProducer () {ProducerBean ProducerBean = new ProducerBean (); Propiedades Propiedades = New Properties (); Properties.SetProperty (PropertyKeyConst.ProducerId, ProducerId); Properties.setProperty (PropertyKeyConst.AccessKey, AccessKey); Properties.setProperty (PropertyKeyConst.SecretKey, SecretKey); Properties.SetProperty (PropertyKeyConst.onsaddr, Onsaddr); ProducerBean.SetProperties (Propiedades); Return ProducerBean; }}PD: Después de este doble 11, se descubrió que los dos métodos anteriores no son muy adecuados para un gran volumen de datos y situaciones de múltiples subprocesos, y el rendimiento es muy pobre, por lo que se recomienda usar 3.
Método 3: (no se requiere XML)
@ComponentPublic ProducterBeAnbeSingleton {@Value ("$ {OpenServices.ons.producerBean.ProducerId}") Private String ProducerId; @Value ("$ {OpenServices.ons.producerBean.accesskey}") privado String AccessKey; @Value ("$ {OpenServices.ons.producerBean.secretkey}") String private SecretKey; @Value ("$ {OpenServices.ons.producerBean.onsaddr}") String private onSaddr; productor privado de productor estático; Singletonholder de clase estática privada {Instancia de ProducerBeSingleton de PRIVED STICTIC FinalSingLeton = New ProducerBeAnsingLeton (); } Private ProducerBeAnsingLeton () {} public static final ProducerBeAnbeSingleton getInstance () {return Singletonholder.Instance; } @PostConstruct public void init () {// Propiedades de inicialización de configuración de la instancia del productor Propiedades = new Properties (); // Propiedades de identificación del productor.setProperty (PropertyKeyConst.ProducerId, ProducerId); // AccessKey Alibaba Cloud Authentication, crea Properties.SetProperty (PropertyKeyConst.AccessKey, AccessKey); // autenticación de nube de secretkey alibaba, crear propiedades.setProperty (PropertyKeyConst.SecretKey, SecretKey); // autenticación de nube de secretkey alibaba, crear propiedades.setProperty (PropertyKeyConst.SecretKey, SecretKey); // Establecer el tiempo de tiempo de espera de envío, la unidad MilliseCond Properties.setProperty (PropertyKeyConst.SendmSgTimeOutmillis, "3000"); // Establezca el nombre de dominio de acceso TCP (consulte el entorno de producción de nube pública como ejemplo aquí) Properties.setProperty (PropertyKeyConst.onsaddr, ONSADDR); productor = OnsFactory.CreateProDucer (propiedades); // Antes de enviar un mensaje, debe llamar al método de inicio para iniciar el productor, y solo necesita llamarlo una vez al productor.start (); } productor público getProDucer () {return productor; }}Configuración de resorte
spring.jpa.properties.Hibernate.Dialect = org.hibernate.dialect.mysql5dialectConsumerconfig.enabled = trueProDucerConfig.enabled = true #method 1: programar.enabled = falso #método 2, 3: cohetmq /u516c/u7f51/u914d/u7f6eopenservices.ons.producerbean.producerid = Pidopenservices.ons.producerbean.accesskey = openservices.ons.producerbean.sssecretkey = openservices.ons.producerbean.secretkey = OpenServices.ons.ProducerBean.onsaddr = Public Network, Hangzhou Public Cloud Production
Método 1 Entregue el código de mensaje:
intente {string jsonc = jsonutils.tojson (onceNMessage); Mensaje mensaje = nuevo mensaje (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); SendResult sendResult = producer.send (mensaje); if (sendResult! = null) {logger.info (". Enviar MQ Message Success!";} else {logger.warn (". SendResult is Null ...");}} Catch (Exception e) {logger.warn ("DoubleVeallPreservice"); Thread.slele (1000); // si hay una excepción, Sleep para 1 segundo}}}}Método 2 Código de mensaje de entrega: (se puede iniciar/cerrarse cada 1000 veces)
producerBean.start (); try {string jsonc = jsonutils.tojson (onceMessage); Mensaje mensaje = nuevo mensaje (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); SendResult sendResult = producer.send (mensaje); if (sendResult! = null) {logger.info (". Enviar MQ Message Success!";} else {logger.warn (". SendResult is Null ...");}} Catch (Exception e) {logger.warn ("DoubleVeallPreseService"); Thread.slele (1000); // if hay una excepción, Sleep para 1 segundo}}}Método 3: entregar el mensaje
intente {string jsonc = jsonutils.tojson (onceNMessage); Mensaje mensaje = nuevo mensaje (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); Productor productor = producerBeAnsingleton.getInstance (). GetProDucer (); SendResult sendResult = producer.send (mensaje); if (sendResult! = null) {logger.info ("doubleLevenmidService.send MQ Message Success! Topic es:";} else {logger.warn ("doubleEvenmidService.sendResult es nulo ...");}}} (excepción e) {logger.error ("doublevenmidservice es nulo ... "+e.getMessage (), e); hilo.sleep (1000); // Si hay una excepción, duerme por 1 segundo}El código que envía el mensaje debe captar la excepción, de lo contrario se enviará repetidamente.
El tema aquí es creado por usted mismo. ElevenMessage es el contenido que se enviará. Soy el objeto que creé por mí mismo.
3. Consumidores
Configurar la clase de inicio:
@Configuration@condyalonproperty (value = "consumerConfig.enabled", teniendoValue = "true", matchifMissing = true) public class consumerConfig {private logger logger = loggerFactory.getLogger (logGerAppenderType.smsdist.name ()); @Bean Public Consumer ConsumerFactory () {// Los consumidores diferentes no pueden cambiar el nombre de las propiedades aquí ConsumerProperties = new Properties (); ConsumerProperties.SetProperty (PropertyKeyConst.ConsumerID, mqconfig.consumer_id); ConsumerProperties.SetProperty (PropertyKeyConst.AccessKey, mqconfig.access_key); ConsumerProperties.SetProperty (PropertyKeyConst.SecretKey, mqconfig.secret_key); //consumerProperties.setProperty(PropertyKeyConst.consumethreadnums.mqconfig.thread_num); ConsumerProperties.SetProperty (PropertyKeyConst.onsaddr, mqconfig.onsaddr); Consumer Consumer = OnsFactory.CreateConsumer (ConsumerProperties); Consumer.subscribe (mqconfig.topic, mqconfig.tag, new DoubleLevenMessageListener ()); // nuevo oyente correspondiente consumidor.start (); logger.info ("ConsumerConfig Start Success"); devolver consumidor; }}Debe elegir el CID y ONSADDR correctos. Puede configurarlo aquí utilizando su propio conteo de subprocesos de consumo, etc.
Cree una clase de oyente de mensajes y consuma mensajes:
@ComponentPublic MessageListener implementa MessageListener {private logger logger = loggerFactory.getLogger ("recordar"); EleveneReposit Static protegido ElevenReposit; @Resource public void setelevenReposit (ElevenRposit ElevenReposit) {MessageListener .ElevenRposit = ElevenReposit; } @Override Public Action Consute (Mensaje Mensaje, ConsumerContext ConsumeContext) {if (Message.GetTópico (). Equals ("propio Tema")) {// Evite el consumo de otros mensajes JSON Errores de conversión Prueba {byte [] body = message.getBody (); Cadena res = nueva cadena (cuerpo); // res es el contenido de mensaje enviado por el productor // código de negocio} else {logger.warn ("!"); }} Catch (Exception e) {logger.error ("MessageListener.consume Error:" + e.getMessage (), e); } logger.info ("MessageListener.Receive Mensaje"); // Si desea probar la función de repostación de mensajes, puede reemplazar Action.CommitMessage con Action.Reconsumelater Devuelve Action.CommitMessage; } else {logger.warn (); Return Action.Reconsumelater; }}Tenga en cuenta que, dado que los consumidores están multiprocesos, el objeto debe inyectarse con Static+SET para elevar el nivel de objeto al proceso, de modo que se puedan compartir múltiples hilos, pero no se pueden llamar a los métodos y variables de la clase principal.
El estado del consumidor puede verificar si el consumidor está conectado con éxito, si el consumo se retrasa, la velocidad de consumo, etc.
Restablecer el sitio de consumo puede borrar todos los mensajes
3. cosas a tener en cuenta
1. El cuerpo de mensaje máximo enviado es 256 kb
2. El mensaje existe por hasta 3 días
3. El número predeterminado de hilos en el lado del consumidor es de 20
4. Si Java cuelga o CPU ocupa una cantidad extremadamente alta durante la ejecución, puede enviar el hilo por 1s de cada 1,000 mensajes al enviarlo.
5. Cuando las pruebas locales o el inicio, reemplace ONSADDR con una red pública, de lo contrario no se iniciará el error.
Lo anterior es todo el contenido de este artículo. Espero que sea útil para el aprendizaje de todos y espero que todos apoyen más a Wulin.com.