1. Vorwort
In letzter Zeit muss das Unternehmen Alibaba Cloud Message Queue verwenden. Um die Verwendung bequemer zu gestalten, habe ich einige Tage damit verbracht, die Meldungswarteschlange in die API -Aufrufmethode einzudämmen, um den Ruf des internen Systems zu erleichtern. Es wurde jetzt abgeschlossen. Hier erfassen wir den Prozess und die relevanten Technologien und teilen Sie mit Ihnen.
Alibaba Cloud bietet jetzt zwei Nachrichtendienste an: MNS -Service und ONS -Service. Ich denke, MNS ist eine vereinfachte Version von ONS, und MNS -Nachrichtenkonsum erfordert benutzerdefinierte Wahlstrategien. Im Gegensatz dazu sind ONS -Funktionen für Veröffentlichungs- und Abonnementmodus leistungsfähiger (z. B. im Vergleich zu MNS bietet ONS Nachrichtenverfolgung, Protokollierung, Überwachung und andere Funktionen), und seine API ist bequemer zu bedienen. Es wurde auch gehört, dass Alibaba in Zukunft keine MNS mehr entwickeln wird, sondern nur beibehalten wird. Der ONS -Service ersetzt nach und nach den MNS -Service und wird zum Hauptprodukt des Message Service von Alibaba. Wenn daher die Verwendung von Nachrichtenwarteschlangen verwendet werden muss, wird empfohlen, MNS nicht erneut zu verwenden. Die Verwendung von ONS ist die beste Wahl.
Beteiligte Techniken: Frühling, Reflexion, dynamischer Proxy, Serialisierung und Deserialisierung von Jackson
Bevor Sie den folgenden Artikel lesen, müssen Sie die obige Dokumentation lesen, um die relevanten Konzepte (Thema, Verbraucher, Produzent, Tag usw.) und die in der Dokumentation bereitgestellten einfachen Senden- und Empfangscode -Implementierungen zu verstehen.
Dieser Blog -Beitrag ist nur für Freunde gedacht, die eine Wissensbasis auf Nachrichtenwarteschlangen haben. Ich freue mich von Natur aus sehr, allen zu helfen. Schalten Sie niemanden, der es nicht versteht, da dies bedeutet, dass Ihr Weg falsch ist.
2. Designplan
1. Nachricht senden
In einer einfachen CSS -Architektur sollte der Server die von einem Themenproduzent gesendete Nachricht anhören, sondern zunächst eine Client -API bereitstellen. Der Client muss nur die API aufrufen und Nachrichten über den Produzenten erstellen.
2. Nachrichtenempfang
Da die API vom Server formuliert wird, weiß der Server natürlich auch, wie diese Nachrichten konsumiert werden.
In diesem Prozess spielt der Server tatsächlich die Rolle der Verbraucher, und der Kunde spielt tatsächlich die Rolle der Hersteller, aber die Regeln für die Erzeugung von Nachrichten werden von den Verbrauchern formuliert, um den Verbraucherverbrauch zu decken.
3. Das ultimative Ziel
Wir möchten ein separates JAR-Paket namens Queue-Core erstellen, um bestimmte Implementierungen von Abhängigkeiten bereitzustellen und Abonnements für Hersteller und Verbraucher zu veröffentlichen.
3. Meldung senden
1. Verbraucher stellen Schnittstellen zur Verfügung
@Topic (name = "kdyzm", processerId = "kdyzm_producer") public interface userQueueresource {@Tag ("test1") public void HandleSerInfo (@body @key ("userInfohandler") UsModel -Benutzer); @Tag ("test2") public void HandleSerInfo1 (@body @Key ("userInfohandler1") UsModel -Benutzer);}Da Thema und Produzent in N: 1 -Beziehung sind, wird ProducerID direkt als Eigenschaft des Themas verwendet. Tag ist eine sehr kritische Filterbedingung, und Verbraucher verwenden sie, um Nachrichten zu klassifizieren, um eine andere Geschäftsabwicklung durchzuführen. Daher wird das Tag hier als Routing -Bedingung verwendet.
2. Der Produzent sendet Nachrichten über die vom Verbraucher bereitgestellte API
Da Verbraucher nur Schnittstellen für die Verwendung von Herstellern zur Verfügung stellen, gibt es keine Möglichkeit, Schnittstellen direkt zu verwenden, da es keine Möglichkeit gibt, sie zu instanziieren. Hier verwenden wir dynamische Proxy, um Objekte zu generieren. Fügen Sie in der von den Verbrauchern bereitgestellten API die folgende Konfiguration hinzu, um die Hersteller dazu zu erleichtern, die Konfiguration direkt zu importieren und zu verwenden. Hier verwenden wir Spring Config basierend auf Java. Bitte wissen Sie.
@ConfigurationPublic Class QueueConfig {@autowired @Bean public userQuUeresource userQuEeresource () {return queueresourceFactory.CreateProxyqueueresource (userqueueresource.class); }}3.. Kapselung des Warteschlangenkerns für die Sende des Herstellers des Herstellers
Alle Anmerkungen in 1 oben (Thema, Tag, Körper, Schlüssel) und die in 2 verwendeten Queueresourcefactory-Klassen müssen in Warteschlangenkern definiert werden. Die Definition der Annotation definiert nur die Regeln. Die eigentliche Implementierung ist tatsächlich in Queueresourcefactory.
importieren java.lang.reflect.invocationHandler; import java.lang.reflect.method; import Java.lang.reflect.proxy; import org.slf4j.logger; com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.sendresult; importieren com.wy.queue.core.api.mqconnection; import com.wy.queue.utils.jacksonsserialisator; Imports; com.wy.queue.core.utils.queuecorespringutils; öffentliche Klasse queueresourceFactory implements InvocationHandler {private statische endgültige Logger -Logger = Loggerfactory.getLogger (QueueresourceFactory.Class); private Zeichenfolge Topicname; private String -Produzent; Private Jacksonserializer Serializer = neuer Jacksonserializer (); private statische endgültige Zeichenfolge prefix = "pid_"; public queueresourceFactory (String topicname, String processerID) {this.topicName = topicname; this.ProducerId = produzent; } public static <t> t createReProxyqueueresource (Klasse <T> clazz) {String topicname = mqutils.gettopicname (clazz); String produzent = mqutils.getProcerId (Clazz); T target = (t) proxy.newProxyInstance (QueueresourceFactory.class.getClassloader (), New Class <?> [] {Clazz}, New QueeresourceFactory (Topicname, Produzent)); Ziel zurückgeben; } @Override public Object Invoke (Object Proxy, Method -Methode, Object [] args) löscht Throwable {if (args.Length == 0 || args.length> 1) {neue RunTimeException ("Nur einen Param an der Queueresource -Schnittstelle akzeptieren."); } String tagname = mqutils.gettagname (Methode); ProducerFactory ProducerFactory = QueuecorespringUtils.getbean (ProducerFactory.Class); MQConnection ConnectionInfo = Queuecorespringutils.getbean (mqConnection.class); Produzent produzierer = produzierfaktorisch // Nachrichtennachricht senden msg = new meldung (// // Das in der Konsole erstellte Thema, dh der Themename, zu dem die Nachricht gehört. ConnectionInfo.getPrefix ()+"_"+themenname, // Nachrichten -Tag, // Es kann als ein Tag in Gmail verstanden werden. Binäre Form von Daten, MQ stört keiner, / / Produzent und Verbraucher müssen eine konsistente Serialisierung und Deserialisierungsmethode serializer verhandeln. Serializer. SendResult sendResult = produzierer.send (msg); logger.info ("Nachrichtenerfolg senden. Nachrichten -ID lautet:" + sendResult.getMessageId ()); null zurückkehren; }}Hier haben wir speziell das benutzerdefinierte Paket und die von Dritten verwendeten Paketnamen veröffentlicht, um die Unterscheidung zu erleichtern.
Was genau wird hier gemacht?
Der Prozess des Sendens einer Nachricht besteht darin, ein Proxy -Objekt auf dem dynamischen Proxy zu erstellen. Das Objekt wird beim Aufrufen der Methode abgefangen. Nehmen Sie zuerst alle Anmerkungen wie Topicname, ProducerID, Tag und andere wichtige Informationen aus den Anmerkungen an und rufen Sie dann Alibaba SDK an, um die Nachricht zu senden. Der Vorgang ist sehr einfach, aber beachten Sie, dass er beim Senden von Nachrichten hier in Umgebungen unterteilt ist. Im Allgemeinen unterscheidet das Unternehmen jetzt drei Umgebungen: QA, Inszenierung und Produkt. Unter ihnen sind QA und Staging Testumgebungen. Für Nachrichtenwarteschlangen gibt es auch drei Ringe. In der Umgebung verwenden QA- und Staging -Umgebungen jedoch häufig das gleiche Alibaba -Konto, um die Kosten zu senken, sodass das geschaffene Thema und das Produktid in denselben Bereich platziert werden. Auf diese Weise darf Topicname mit demselben Namen nicht existieren. Daher wird das Präfix der Umgebung hinzugefügt, um sie zu unterscheiden, wie z. B. qa_topicname, pid_staging_producerid usw.; Darüber hinaus bietet Queue-Core eine MQConnection-Schnittstelle zur Erfassung von Konfigurationsinformationen, und die Produzentendienste müssen diese Schnittstelle nur implementieren.
4. Produzent sendet Nachrichten
@Autowired private userQueueresource userqueueresource; @Override public void sendMessage () {UsModel userModel = new UserModel (); UsModel.SetName ("kdyzm"); UsModel.Setage (25); userQueueresource.HandLEUSERINFO (UsModel); }Nur ein paar Codezeilen werden benötigt, um die Nachricht an das angegebene Thema zu senden, was viel dünner ist als der native Sendungscode.
4. Nachrichtenkonsum
Im Vergleich zum Senden von Nachrichten ist der Nachrichtenkonsum komplizierter.
1. Message Consumption Design
Da Thema und Verbraucher N: N -Beziehung sind, wird ConsumerID auf die spezifische Implementierungsmethode des Verbrauchers gestellt
@Controller@queueresourcePublic class UserQueueresourcEImpl implementiert UserQuEeresource {private logger logger = loggerfactory.getLogger (this.getClass ()); @Override @consumerannotation ("kdyzm_consumer") public void handleSerinfo (UsModel -Benutzer) {logger.info ("Nachricht 1 empfangen: {}", new gson (). Tojson (user)); } @Override @ConsumerAnnotation ("kdyzm_consumer1") public void HandleSerInfo1 (UsModel -Benutzer) {logger.info ("Nachricht 2 empfangen: {}", New GSON (). TOJSON (Benutzer)); }}Hier sind zwei neue Anmerkungen @queueresource und @Consumerannotation. Diese beiden Anmerkungen werden in Zukunft diskutiert. Jemand kann mich fragen, warum ich den Namen ConsumerAnnotation anstelle des Namens -Verbrauchers verwenden sollte, da der Name Verbraucher mit dem Namen in der von Aliyun bereitgestellten SDK in Konflikt steht. . . .
Hier stellen die Verbraucher den Produzenten die API -Schnittstelle zur Verfügung, um den Produzenten zum Senden von Nachrichten zu erleichtern, und die Verbraucher implementieren die Schnittstelle, um Nachrichten zu konsumieren, die von den Produzenten gesendet werden. Die Implementierung der API -Schnittstelle ist die Implementierung der Überwachung, eine relativ kritische Logik.
2.queue-core implementiert die Kernlogik der Meldungswarteschlange zuzuhören
Schritt 1: Verwenden Sie die Hörmethode des Federbehälter
Schritt 2: Verbreiten Sie die Verarbeitungsbohnen
Wie gehe ich mit diesen Bohnen um? Jede Bohne ist eigentlich ein Objekt. Mit einem Objekt wie dem userQueueresourcEImpl -Objekt im obigen Beispiel können wir das vom Objekt implementierte Schnittstellen -Bytecode -Objekt und dann die Annotationen auf der Schnittstelle userqueuerousce und Anmerkungen zu den Methoden und Methoden erhalten. Natürlich können auch die Anmerkungen zur Implementierungsmethode des userqueueresourcEImpl erhalten werden. Hier werde ich ConsumerID als Schlüssel verwenden, und die verbleibenden relevanten Informationen werden als Wert eingekapselt und in ein Kartenobjekt abgeschnitten. Der Kerncode lautet wie folgt:
Class <?> Clazz = ressourceImpl.getClass (); Klasse <?> Clazzif = clazz.getInterfaces () [0]; Methode [] methods = clazz.getMethods (); String topicname = mqutils.gettopicname (clazzif); für (Methode M: Methoden) {ConsumerAnnotation Consumeranno = M.Getannotation (ConsumerAnnotation.Class); if (null == ConsumerAnno) {// logger.error ("method = {} Benötige Verbraucherannotation.", M.GetName ()); weitermachen; } String ConsumerID = ConsumerAnno.Value (); if (stringutils.isempty (conuerId)) {logger.Error ("method = {} conspecterID kann nicht null sein", M.GetName ()); weitermachen; } Class <?> [] ParameterTypes = M.GetParameterTypes (); Methode ressourceneifMethod = null; try {ressourceneifMethod = clazzif.getMethod (m.getName (), parameterTypes); } catch (NoSuchMethodException | SecurityException e) {Logger.Error ("nicht find method = {} at super interface = {}.", m.getName (), clazzif.getCanonicalName (), e); weitermachen; } String tagname = mqutils.gettagname (ressourceneifMethod); Consumersmap.put (Conserid, New MethodInfo (Topicname, TagName, m)); }Schritt 3: Verbrauchsaktionen durch Reflexion
Bestimmen Sie zunächst den Zeitpunkt der Ausführung der Reflexionsaktion, das heißt, neue Nachrichten hören
Zweitens, wie kann man Reflexionsaktionen ausführen? Ich werde nicht auf Details eingehen. Kinderschuhe mit Reflexionsfundamenten wissen, wie man sie macht. Der Kerncode lautet wie folgt:
MQConnection ConnectionInfo = Queuecorespringutils.getbean (mqConnection.class); String topicPrefix = ConnectionInfo.getPrefix ()+"_"; String ConsumerIDPrefix = Präfix+ConnectionInfo.getPrefix ()+"_"; für (String ConsumerID: Consumersmap.Keyset ()) {methodInfo methodInfo = Consumersmap.get (ConsumerID); Properties ConnectionProperties = convertToproperties (ConnectionInfo); // Verbraucher -ID, die Sie im Console ConnectionProperties.put (PropertyKeyConst.consumerid, ConsumerIDPrefix+ConsumerID) erstellt haben; Consumer Consumer = ONSFactory.CreateConsumer (ConnectionProperties); Consumer. topic = {}, tag = {}, ConsumerID = {}, message = {} ", topicprefix+methodInfo.gettopicname (), methodInfo.gettAgname (), ConsumerIDPrefix+Consumerid, MessageBody); method.getParameterTypes () [0]; Consumer.Start (); logger.info ("Consumer = {} hat gestartet.", ConsumerIDPrefix+conpenerID); }5. Weitere Informationen finden Sie im GIT -Link unten für den vollständigen Code
https://github.com/kdyzm/queue-core.git
Das obige ist der gesamte Inhalt dieses Artikels. Ich hoffe, es wird für das Lernen aller hilfreich sein und ich hoffe, jeder wird Wulin.com mehr unterstützen.