1. Prefacio
Recientemente, la compañía tiene la necesidad de usar la cola de mensajes de la nube de Alibaba. Para que sea más conveniente de usar, he pasado unos días encapsulando la cola de mensajes en el método de llamadas de API para facilitar la llamada del sistema interno. Se ha completado ahora. Aquí registramos el proceso y las tecnologías relevantes utilizadas, y compartimos con usted.
Alibaba Cloud ahora proporciona dos servicios de mensajes: servicio MNS y servicio ONS. Creo que MNS es una versión simplificada de ONS, y el consumo de mensajes de MNS requiere estrategias de votación personalizadas. Por el contrario, las funciones de modo de publicación y suscripción de ONS son más potentes (por ejemplo, en comparación con MNS, ONS proporciona seguimiento de mensajes, registro, monitoreo y otras funciones), y su API es más conveniente de usar. También se ha escuchado que Alibaba ya no desarrollará MN en el futuro, sino que solo lo mantendrá. El servicio ONS reemplazará gradualmente el servicio MNS y se convertirá en el producto principal del servicio de mensajes de Alibaba. Por lo tanto, si es necesario usar colas de mensajes, se recomienda no usar MNS nuevamente. Usar ONS es la mejor opción.
Técnicas involucradas: primavera, reflexión, proxy dinámico, serialización y deserialización de Jackson
Antes de leer el siguiente artículo, debe leer la documentación anterior para comprender los conceptos relevantes (tema, consumidor, productor, etiqueta, etc.) y las simples implementaciones de código de envío y recepción proporcionadas en la documentación.
Esta publicación de blog es solo para amigos que tienen una base de conocimiento en las colas de mensajes. Estoy naturalmente muy feliz de ayudar a todos. No regañen a nadie que no lo entienda, ya que significa que su camino está mal.
2. Plan de diseño
1. Envío de mensajes
En una arquitectura CSS simple, suponiendo que el servidor escuche el mensaje enviado por un productor de temas, primero debe proporcionar una API de cliente. El cliente solo necesita simplemente llamar a la API y puede producir mensajes a través del productor.
2. Recepción de mensajes
Dado que la API está formulada por el servidor, el servidor, por supuesto, también sabe cómo consumir estos mensajes.
En este proceso, el servidor en realidad desempeña el papel de los consumidores, y el cliente realmente desempeña el papel de los productores, pero las reglas para que los productores produzcan mensajes están formulados por los consumidores para satisfacer las necesidades de consumo de los consumidores.
3. El objetivo final
Queremos crear un paquete jar separado llamado Queue-Core para proporcionar implementaciones específicas de dependencias y publicar suscripciones para productores y consumidores.
3. Envío de mensajes
1. Los consumidores proporcionan interfaces
@Topic (name = "kdyzm", producerId = "kdyzm_producer") interfaz pública userQueueResource {@tag ("test1") public void handleUserInfo (@Body @Key ("UserInfoHandler") userModel user); @Tag ("test2") public void handleUserInfo1 (@Body @Key ("UserInfoHandler1") UserModel User);}Dado que el tema y el productor están en la relación N: 1, el producterID se usa directamente como propiedad del tema; La etiqueta es una condición de filtrado muy crítica, y los consumidores la usan para clasificar mensajes para realizar diferentes procesos comerciales, por lo que la etiqueta se usa como condición de enrutamiento aquí.
2. El productor envía mensajes utilizando la API proporcionada por el consumidor
Dado que los consumidores solo proporcionan interfaces para que los productores lo usen, no hay forma de usar interfaces directamente porque no hay forma de instanciarlas. Aquí usamos proxy dinámico para generar objetos. En la API proporcionada por los consumidores, agregue la siguiente configuración para facilitar a los productores que importen directamente la configuración y la use. Aquí usamos la configuración de Spring basada en Java. Por favor sepa.
@ConfigurationPublic Class QueueConFig {@aUtowired @Bean Public UserqueueResource UserqueueResource () {return queueResourceFactory.CreateProxyqueueeResource (userqueueResource.class); }}3. Encapsulación de colas para el envío del mensaje del productor
Todas las anotaciones en 1 anteriores (tema, etiqueta, cuerpo, clave) y las clases de QueueresourceFactory utilizadas en 2 deben definirse en la cola. La definición de la anotación solo define las reglas. La implementación real está en realidad en QueuerSourceFactory.
import java.lang.reflect.invocationHandler; import java.lang.reflect.method; import java.lang.reflect.proxy; import org.slf4j.logger; importar org.slf4j.loggerfactory; import com.aliyun.openservices.ons.api.message; import com.aliyun.openservices.ons.api.producer; import com.aliyun.openservices.ons.api.sendresult; import com.wy.queue.core.api.mqconnection; import com.wy.queue.core.utils.jacksonserializer; import com.wy.queue.core.core.util.s.mtils; importación; importación; importación; importación; com.wy.queue.core.utils.queueCorespringutils; public class queueResourceFactory implementa InvocationHandler {private estático final de logger logger = loggerFactory.getLogger (queueResourceFactory.class); String privado TopicName; Cadena privada ProducerId; Private JacksonSerializer Serializer = nuevo JackSonserializer (); Private static final String prefix = "Pid_"; public queueResourceFactory (String TopicName, String ProducerId) {this.topicName = topicName; this.producerId = ProducerId; } public static <t> t createProxyqueuereResource (class <t> clazz) {String topicName = mqUtils.getTopicName (clazz); String producerId = mqUtils.getProducerId (clazz); T Target = (t) proxy.newproxyInstance (queueResourceFactory.class.getClassLoader (), nueva clase <?> [] {Clazz}, New QueuerSourceFactory (TopicName, ProducerId)); objetivo de retorno; } @Override public Object Invoke (Proxy de objeto, método Método, objeto [] args) lanza lando {if (args.length == 0 || args.length> 1) {throw new runtimeException ("solo acepte un parámetro en la interfaz de QueuerSource."); } String tagName = mqutils.gettagname (método); ProducerFactory ProducerFactory = queueCorespringUtilss.getBean (ProducerFactory.Class); MQConnection ConnectionInfo = queueCorespringUtils.getBean (mqconnection.class); Productor productor = producerFactory.CreateProDucer (prefijo+conexionInfo.getPrefix ()+"_"+ProducerId); // Enviar mensaje Mensaje msg = nuevo mensaje (// // El tema creado en la consola, es decir, el nombre del tema al que pertenece el mensaje. ConnectionInfo.getPrefix ()+"_"+TopicName, // etiqueta de mensaje, // se puede entender como una etiqueta en gMail, y el mensaje se reclace en el servidor de los mensajes de la etiqueta. Forma de datos, MQ no interfiere con ninguno, // se requiere que el productor y el consumidor negocien un método de serialización y deserialización consistente serializador.serialize (args [0]). getBytes ()); SendResult sendResult = producer.send (msg); logger.info ("Enviar el éxito del mensaje. ID de mensaje es:" + sendResult.getMessageId ()); regresar nulo; }}Aquí hemos publicado especialmente el paquete personalizado y los nombres de los paquetes utilizados por terceros para facilitar la distinción.
¿Qué se hacen exactamente aquí?
El proceso de enviar un mensaje es crear un objeto proxy en el proxy dinámico. El objeto será interceptado al llamar al método. Primero, analice todas las anotaciones, como TopicName, ProducerId, TAG y otra información clave de las anotaciones, y luego llame a Alibaba SDK para enviar el mensaje. El proceso es muy simple, pero tenga en cuenta que al enviar mensajes aquí, se divide en entornos. En términos generales, la empresa ahora distingue tres entornos: QA, puesta en escena y producto. Entre ellos, el control de calidad y la puesta en escena se encuentran entornos de prueba. Para las colas de mensajes, también hay tres anillos. Sin embargo, en el entorno, los entornos de QA y en estadificación a menudo usan la misma cuenta de Alibaba para reducir los costos, por lo que el tema creado y el ProductID se colocarán en la misma área. De esta manera, el nombre de Topicname con el mismo nombre no puede existir, por lo que se agrega el prefijo del entorno para distinguirlos, como Qa_TopicName, PID_staging_producerid, etc.; Además, Queue-Core proporciona una interfaz MQConnection para obtener información de configuración, y los servicios de productores solo necesitan implementar esta interfaz.
4. El productor envía mensajes
@AUTOWIREDIREDIREDEDREDEDECULE DE USUROUROURCE USERQUEUERSOURCE; @Override public void sendMessage () {usermodel usermodel = new Usermodel (); usermodel.setName ("kdyzm"); usermodel.setage (25); userQueueResource.handleUserInfo (usermodel); }Solo se necesitan unas pocas líneas de código para enviar el mensaje al tema especificado, que es mucho más delgado que el código de envío nativo.
4. Consumo de noticias
En comparación con el envío de mensajes, el consumo de mensajes es más complicado.
1. Diseño de consumo de mensajes
Dado que el tema y el consumidor son la relación N: N, ConsumerId se coloca en el método de implementación específico del consumidor
@Controller@QueuerSourCePublic UserqueueResourceImpl implementa UserQueueResource {private logger logger = loggerFactory.getLogger (this.getClass ()); @Override @ConsumERAnnotation ("kdyzm_consumer") public void handleUserInfo (userModel user) {logger.info ("Mensaje 1 recibido: {}", nuevo gson (). Tjson (user)); } @Override @ConsumERAnnotation ("kdyzm_consumer1") public void manyUserInfo1 (userModel user) {logger.info ("Mensaje 2 recibido: {}", new Gson (). Tojson (usuario)); }}Aquí hay dos nuevas anotaciones @QueueResource y @ConsumerAnnotation. Estas dos anotaciones se discutirán en el futuro. Alguien puede preguntarme por qué debería usar el nombre ConsumeRanNotation en lugar del consumidor del nombre, porque el nombre del consumidor entra en conflicto con el nombre en el SDK proporcionado por Aliyun. . . .
Aquí, los consumidores proporcionan la interfaz API a los productores para facilitar a los productores que envíen mensajes, y los consumidores implementan la interfaz para consumir mensajes enviados por los productores. Cómo implementar la interfaz API es implementar el monitoreo, que es una lógica relativamente crítica.
2. Core de cola implementa la lógica central de la escucha de la cola de mensajes
Paso 1: Use el método de escucha del contenedor de resorte para obtener todos los frijoles con anotaciones de QueuerSource
Paso 2: distribuir los granos de procesamiento
¿Cómo lidiar con estos frijoles? Cada frijol es en realidad un objeto. Con un objeto, como el objeto UserQueueReSourceImpl en el ejemplo anterior, podemos obtener el objeto Bytecode de interfaz implementado por el objeto, y luego obtener las anotaciones en la interfaz UserQueueererouse y las anotaciones en los métodos y métodos. Por supuesto, también se pueden obtener las anotaciones sobre el método de implementación del usuario de UserqueueResourceImpl. Aquí usaré ConsumerId como clave, y la información relevante restante se encapsula como valor y almacena en caché en un objeto de mapa. El código central es el siguiente:
Class <?> Clazz = ResourceImpl.getClass (); Clase <?> Clazzif = clazz.getInterfaces () [0]; Método [] métodos = clazz.getMethods (); Cadena TopicName = mqutils.gettopicName (Clazzif); para (método m: métodos) {consumerAnnotation consumeRanno = m.getAnnotation (consumeRanNotation.class); if (null == ConsumeRanno) {// logger.error ("método = {} necesita anotación de consumo.", m.getName ()); continuar; } String ConsumerId = ConsumeRanno.Value (); if (stringUtils.isEmpty (consuerid)) {logger.error ("método = {} ConsumerId no puede ser nulo", m.getName ()); continuar; } Class <?> [] Parametertypes = m.getParametertypes (); Método recocesceifmethod = null; intente {resourceifMethod = clazzif.getMethod (m.getName (), parametertypes); } catch (nosuchmethodException | SecurityException e) {logger.error ("No se puede encontrar método = {} en super interfaz = {}.", m.getName (), clazzif.getCanonicalName (), e); continuar; } String tagName = mqutils.gettagname (resourceifmethod); ConsumersMap.put (consuerid, nuevo MethodInfo (TopicName, TagName, M)); }Paso 3: Acciones de consumo a través de la reflexión
Primero, determine el momento de la ejecución de la acción de reflexión, es decir, escuche nuevos mensajes
En segundo lugar, ¿cómo realizar acciones de reflexión? No entraré en detalles. Los zapatos de los niños con bases relacionadas con el reflejo saben cómo hacerlos. El código central es el siguiente:
MQConnection ConnectionInfo = queueCorespringUtils.getBean (mqconnection.class); String topicPrefix = ConnectionInfo.getPrefix ()+"_"; String ConsumerIdPrefix = prefix+ConnectionInfo.getPrefix ()+"_"; for (String ConsumerId: ConsumersMap.KeySet ()) {MethodInfo MethodInfo = ConsumersMap.get (ConsumerId); Propiedades ConnectionProperties = ConvertTOproperties (ConnectionInfo); // ID de consumidor que creó en la consola ConnectionProperties.put (PropertyKeyConst.ConsumerID, ConsumerIdPrefix+ConsumerId); Consumer Consumer = OnsFactory.CreateConsumer (ConnectionProperties); consumidor.subscribe (topicPrefix+metodInfo.getTopicName (), metodInfo.getTagName (), new MessageListener () {// suscríbete a la etiqueta múltiple consumo de acciones públicas (mensaje de mensaje, contexto de consumo) {try {string MessageBody = New String (Message.getBody () "Utf-8); Logger. topic = {}, tag = {}, consumerId = {}, message = {} ", topicPrefix+metodInfo.getTopicName (), metodInfo.gettagName (), consumo de consumo) Objeto Arg = JackSonserialize.Deserialize (MessageBody, ParametType); Consumer.Start (); logger.info ("Consumer = {} ha comenzado.", ConsumerIdPrefix+ConsumerId); }5. Vea el enlace Git a continuación para ver el código completo
https://github.com/kdyzm/queue-core.git
Lo anterior es todo el contenido de este artículo. Espero que sea útil para el aprendizaje de todos y espero que todos apoyen más a Wulin.com.