1. Préface
Récemment, l'entreprise a besoin d'utiliser la file d'attente de messages cloud Alibaba. Afin de le rendre plus pratique à utiliser, j'ai passé quelques jours à résumer la file d'attente de messages dans une méthode d'appel de l'API pour faciliter l'appel du système interne. Il est terminé maintenant. Ici, nous enregistrons le processus et les technologies pertinentes utilisées et partageons avec vous.
Alibaba Cloud fournit désormais deux services de message: Service MNS et service ONS. Je pense que MNS est une version simplifiée des ONS, et la consommation de messages MNS nécessite des stratégies de sondage personnalisées. En revanche, les fonctions de mode de publication et d'abonnement des ONS sont plus puissantes (par exemple, par rapport à MNS, ONS fournit le suivi des messages, la journalisation, la surveillance et d'autres fonctions), et son API est plus pratique à utiliser. Il a également été entendu qu'Alibaba ne développera plus de MNS à l'avenir, mais ne le maintiendra que. Le service ONS remplacera progressivement le service MNS et deviendra le principal produit du service de messages d'Alibaba. Par conséquent, s'il est nécessaire d'utiliser des files d'attente de messages, il est recommandé de ne plus utiliser MNS. L'utilisation d'ONS est le meilleur choix.
Techniques impliquées: printemps, réflexion, proxy dynamique, sérialisation Jackson et désérialisation
Avant de lire l'article suivant, vous devez lire la documentation ci-dessus pour comprendre les concepts pertinents (sujet, consommateur, producteur, balise, etc.) et les implémentations de code d'envoi et de réception simples fournies dans la documentation.
Ce billet de blog est uniquement pour des amis qui ont une base de connaissances sur les files d'attente de messages. Je suis naturellement très heureux d'aider tout le monde. Ne grondez personne qui ne le comprend pas, car cela signifie que votre chemin est faux.
2. Plan de conception
1. Envoi de message
Dans une architecture CSS simple, en supposant que le serveur écoutera le message envoyé par un producteur de sujet, il devrait d'abord fournir une API client. Le client n'a qu'à simplement appeler l'API et peut produire des messages via le producteur.
2. Réception des messages
Étant donné que l'API est formulée par le serveur, le serveur sait bien sûr également comment consommer ces messages.
Dans ce processus, le serveur joue réellement le rôle des consommateurs, et le client joue réellement le rôle des producteurs, mais les règles que les producteurs peuvent produire des messages sont formulées par les consommateurs pour répondre aux besoins de consommation des consommateurs.
3. Le but ultime
Nous voulons créer un package JAR distinct nommé file d'attente pour fournir des implémentations spécifiques de dépendances et publier des abonnements pour les producteurs et les consommateurs.
3. Envoi de message
1. Les consommateurs fournissent des interfaces
@Topic (name = "kdyzm", producerid = "kdyzm_producer") interface publique userQueueResource {@tag ("test1") public void handleUserInfo (@body @key ("userInfoHandler") userodel userodel); @Tag ("test2") public void handleUserInfo1 (@body @key ("userInfoHandler1") UserModel User);}Étant donné que le sujet et le producteur sont en relation N: 1, Producerid est directement utilisé comme propriété de sujet; TAG est une condition de filtrage très critique, et les consommateurs l'utilisent pour classer les messages pour effectuer différents traitements commerciaux, donc la balise est utilisée comme condition de routage ici.
2. Le producteur envoie des messages à l'aide de l'API fournie par le consommateur
Étant donné que les consommateurs ne fournissent que des interfaces aux producteurs, il n'y a aucun moyen d'utiliser directement les interfaces car il n'y a aucun moyen de les instancier. Ici, nous utilisons le proxy dynamique pour générer des objets. Dans l'API fournie par les consommateurs, ajoutez la configuration suivante pour faciliter les producteurs pour importer directement la configuration et l'utiliser. Ici, nous utilisons la configuration Spring basée sur Java. Sachez.
@Configurationpublic class queueConfig {@autowired @bean public userQueueResource userQueueResource () {return queueResourceFactory.createProxyqueueResource (userQueueResource.class); }}3. Encapsulation de la file d'attente pour le message du producteur
Toutes les annotations dans 1 ci-dessus (sujet, balise, corps, clé) et les classes de queueresourcefactory utilisées en 2 doivent être définies dans la file d'attente. La définition de l'annotation définit uniquement les règles. La véritable implémentation est en fait dans Qu queueResourceFactory.
import java.lang.reflect.invocationhandler; import java.lang.reflect.method; import java.lang.reflect.proxy; import org.slf4j.logger; import org.slf4j.loggerfactory; import com.aliyun.openvices.ons.api.Message; importation; com.aliyun.openservices.ons.api.producer; import com.aliyun.openservices.ons.api.sendResult; import com.wy.queue.core.api.mqconnection; import com.wy.queue.core.utitils.jacksonSerializer; import com.wy.queue.core.core.utils.mqQues; com.wy.queue.core.utils.queueCoresPringUtils; public class queueResourceFactory implémente invocationhandler {private static final logger logger = loggerfactory.getLogger (queueResourceFactory.class); STRING PRIVÉE TUBRENAME; Producerid de cordes privées; Serializer JackSonSerializer privé = new JackSonSerializer (); Private Static Final String Prefix = "PID_"; public queueResourceFactory (String topicName, String producerid) {this.topicName = topicName; this.producerid = producerid; } public static <T> t CreateProxyqueUeResource (class <T> Clazz) {String topicName = mQutils.getTopicName (Clazz); String producerid = mQutils.getProducerid (Clazz); T cible = (t) proxy.newproxyinstance (queueresourcefactory.class.getClassloadher (), new class <?> [] {Clazz}, new QueueResourceFactory (topicName, producerid)); cible de retour; } @Override public Object Invoke (Object Proxy, Method Method, Object [] args) lance Throwsable {if (args.length == 0 || args.length> 1) {throw new RuntimeException ("Accepter un seul param à l'interface QueueResource."); } String tagname = mQutils.getTagName (méthode); ProducerFactory produnerFactory = queueCoresPringUtils.getBean (producerFactory.class); MQConnection ConnectionInfo = queueCoresPringUtils.getBean (mqconnection.class); Producteur producteur = producerFactory.CreateProducer (Prefix + ConnectionInfo.getPrefix () + "_" + producerid); // Envoi du message Message msg = nouveau message (// // la rubrique créée dans la console, c'est-à-dire le nom de sujet auquel le message appartient. Forme binaire de données, MQ n'interfère pas avec aucun, // producteur et consommateur sont obligés de négocier une sérialisation cohérente et de la méthode de désérialisation Serializer.serialize (args [0]). GetBytes ()); SendResult sendResult = producer.send (msg); Logger.info ("Envoyer le message du message. retourner null; }}Ici, nous avons spécialement publié le package personnalisé et les noms de packages utilisés par des tiers pour faciliter la distinction.
Que se font exactement ici?
Le processus d'envoi d'un message est de créer un objet proxy sur le proxy dynamique. L'objet sera intercepté lors de l'appel de la méthode. Tout d'abord, analysez toutes les annotations, telles que TopicName, Producerid, Tag et d'autres informations clés des annotations, puis appelez Alibaba SDK pour envoyer le message. Le processus est très simple, mais notez que lors de l'envoi de messages ici, il est divisé en environnements. D'une manière générale, l'entreprise distingue désormais trois environnements: QA, mise en scène et produit. Parmi eux, l'AQ et la mise en scène sont des environnements de test. Pour les files d'attente de messages, il y a aussi trois anneaux. Dans l'environnement, cependant, l'AQ et les environnements de mise en scène utilisent souvent le même compte Alibaba pour réduire les coûts, de sorte que le sujet créé et le productide seront placés dans la même zone. De cette façon, le nom de sujet avec le même nom n'est pas autorisé à exister, donc le préfixe d'environnement est ajouté pour les distinguer, tels que QA_TOPICNAME, PID_STING_PRODUDERID, etc.; De plus, la file d'attente fournit une interface MQConnection pour obtenir des informations de configuration, et les services de producteur n'ont qu'à implémenter cette interface.
4. Le producteur envoie des messages
@Autowired Privy UserQueueResource UserQueueResource; @Override public void sendMessage () {userModel userModel = new UserModel (); userModel.setName ("KDYZM"); userModel.Setage (25); userQueueResource.HandleuseRinfo (UserModel); }Quelques lignes de code sont nécessaires pour envoyer le message au sujet spécifié, qui est beaucoup plus mince que le code d'envoi natif.
4. Consommation de nouvelles
Par rapport à l'envoi de messages, la consommation de messages est plus compliquée.
1. Conception de la consommation de messages
Étant donné que le sujet et le consommateur sont n: n relation, ConsumerId est placé sur la méthode de mise en œuvre spécifique du consommateur
@ Contrôleur @ queueresourcepublic classe userQueueResourceImpl implémente userQueueResource {private logger logger = loggerfactory.getLogger (this.getClass ()); @Override @ConsumeRannotation ("KDYZM_CONSUMER") public void handleUserInfo (UserModel User) {logger.info ("Message 1 reçu: {}", new gson (). Tojson (user)); } @Override @ConsumeRannotation ("KDYZM_CONSUMER1") public void handleUserInfo1 (UserModel User) {Logger.info ("Message 2 reçu: {}", new gson (). Tojson (user)); }}Voici deux nouvelles annotations @QueueResource et @Consumerannotation. Ces deux annotations seront discutées à l'avenir. Quelqu'un peut me demander pourquoi je devrais utiliser le nom Consumerannotation au lieu du nom du consommateur, car le nom de consommation est en conflit avec le nom du SDK fourni par Aliyun. . . .
Ici, les consommateurs fournissent l'interface API aux producteurs pour faciliter les producteurs pour envoyer des messages, et les consommateurs implémentent l'interface pour consommer des messages envoyés par les producteurs. Comment implémenter l'interface API consiste à implémenter la surveillance, qui est une logique relativement critique.
2.Queue-core implémente la logique principale de l'écoute de file d'attente de messages
Étape 1: Utilisez la méthode d'écoute du conteneur à ressort pour obtenir tous les haricots avec des annotations de queueresource
Étape 2: Distribuez les haricots de traitement
Comment gérer ces haricots? Chaque haricot est en fait un objet. Avec un objet, tel que l'objet UserQueueResourceImpl dans l'exemple ci-dessus, nous pouvons obtenir l'objet ByteCode d'interface implémenté par l'objet, puis obtenir les annotations sur l'interface userQueuereary et les annotations sur les méthodes et méthodes. Bien sûr, les annotations sur la méthode de mise en œuvre du UserQueueResourceImpl peuvent également être obtenues. Ici, j'utiliserai Consumerid comme clé, et les informations pertinentes restantes sont encapsulées comme valeur et mises en cache dans un objet MAP. Le code central est le suivant:
Class <?> Clazz = ResourceImpl.getClass (); Classe <?> Clazzif = Clazz.getInterfaces () [0]; Méthode [] Methods = Clazz.getMethods (); String topicName = mQutils.getTopicName (ClazzIF); pour (méthode m: méthodes) {Consumerannotation Consumeranno = M.GetAnnotation (Consumerannotation.class); if (null == ConsumerAnno) {// logger.error ("méthode = {} a besoin d'annotation des consommateurs.", M.GetName ()); continuer; } String ConsumerId = ConsumerAnno.Value (); if (stringUtils.isempty (consuerid)) {logger.error ("méthode = {} Consumerid ne peut pas être null", m.getName ()); continuer; } Class <?> [] ParameterTypes = M.GetParameterTypes (); Méthode ResourceIfMethod = null; try {ResourceIfMethod = Clazzif.getMethod (m.getName (), ParameterTypes); } catch (nosuchMetHodexception | SecurityException e) {logger.Error ("Impossible de trouver méthode = {} à super interface = {}.", M.GetName (), Clazzif.getCanonicalName (), e); continuer; } String tagname = mQutils.getTagName (ResourceIfMethod); ConsumersMap.put (Consuerid, new Methodinfo (topicName, tagname, m)); }Étape 3: Actions de consommation par la réflexion
Tout d'abord, déterminez le moment de l'exécution de l'action de réflexion, c'est-à-dire écouter de nouveaux messages
Deuxièmement, comment effectuer des actions de réflexion? Je n'entrerai pas dans les détails. Les chaussures pour enfants avec des fondations liées à la réflexion savent les fabriquer. Le code central est le suivant:
MQConnection ConnectionInfo = queueCoresPringUtils.getBean (mqconnection.class); String topicPrefix = ConnectionInfo.getPrefix () + "_"; String ConsumerIdPrefix = Prefix + ConnectionInfo.getPrefix () + "_"; pour (String ConsumerId: ConsumersMap.KeySet ()) {méthodyInfo Methodinfo = ConsumersMap.get (Consumerid); Propriétés ConnectionProperties = ConvertToproperties (ConnectionInfo); // Consumer ID que vous avez créé dans la console ConnectionProperties.put (propriétéKeyConst.ConsumeRid, ConsumerIdPrefix + Consumerid); Consumer Consumer = ONSFactory.CreateConsumer (ConnectionProperties); Consumer.Subscribe (topicPrefix + Methodinfo.getTopicName (), méthodyInfo.getTagName (), new MessageListener () {// Abonnez-vous à plusieurs balises publique (message message, "utf-8"); topic = {}, tag = {}, consumerid = {}, message = {} ", topicprefix + methodinfo.gettopicName (), methodinfo.gettagname (), consumeridprefix + consommationrid, messagebody); méthode méthode = méthode = Methand.getParameterTypeS () [0]; Consumer.start (); Logger.info ("Consumer = {} a commencé.", ConsumerIdPrefix + Consumerid); }5. Voir le lien Git ci-dessous pour le code complet
https://github.com/kdyzm/queue-core.git
Ce qui précède est tout le contenu de cet article. J'espère que cela sera utile à l'apprentissage de tous et j'espère que tout le monde soutiendra davantage Wulin.com.