1. Offizielle Website von Alibaba Cloud --- HILFE DOCUMENT
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.wwtiuh
Befolgen Sie die Schritte der offiziellen Website, um ein Thema zu erstellen, sich für Veröffentlichungen (Produzent) zu bewerben und ein Abonnement (Verbraucher) zu beantragen,
2. Code
1. Konfiguration:
öffentliche Klasse MQConfig {/** * Bitte ersetzen Sie die folgenden XXX, bevor Sie mit dem Test beginnen public static final String public_consumer_id = "cid_service"; public static final String access_key = "123"; public static final String secry_key = "123"; public static final String tag = ""; public static Final String thread_num = "25"; // Anzahl der Verbraucher-Threads/*** Onsaddr Bitte konfigurieren Sie nach verschiedenen Regionen* öffentlicher Netzwerktest: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet* Public Cloud Produktion: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * Hangzhou Financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/Rocketmq/nsaden4clientin-internal * Shenzhenzen Financial Cloud: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */ public static final String ONSADDR = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal";}ONSADDR Alibaba Cloud verwendet die öffentliche Cloud -Produktion, und Tests nutzen das öffentliche Netzwerk
Verschiedene Dienste können unterschiedliche Tags festlegen, aber wenn das Nachrichtenvolumen groß ist, wird empfohlen, ein neues Thema zu erstellen.
2. Produzent
Methode 1:
Konfigurationsdatei: produzent.xml
<? name = "properties"> <map> <Eintragsetaste = "produzierer" value = "" /> <!-PID, bitte ersetzen Sie-> <Eintrags-Key = "AccessKey" value = " /> <!-Access_Key, bitte ersetzen Sie-> <eintrags key =" SecretKey "Value =" /> <! http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet Public Cloud Produktion: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal Hangzhou Financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal shenzhen Financial Cloud: http://mq4finance-sz.addr.aliyun.com:8080/Rocketmq/nsaddr4client internal-ornsadmq/nsaddr4clientin-interal-> <eintriebs key = "usermq/nsaddr4clientin-interal-<eintriebs key =" tela value = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/> </map> </property> </bean> </beans>
Startmethode 1, festgelegt in der globalen Einstellung der Klasse:
// Initialisieren Sie den Produzenten private applicationContext ctx; privater Produzenthersteller; @Value ("$ {producerConfig.Enabled}") // Switch, Feder -Konfigurationselement, true eingeschaltet, false deaktiviert private boolean produconCon -formabled; @PostConstruct public void init () {if (true == producerConFigenabled) {ctx = new classPathXMLApplicationContext ("produzent.xml"); Produzent = (produzierterbean) ctx.getbean ("Produzent"); }}PS: Ich habe kürzlich eine Grube entdeckt. Wenn der Produzent in der obigen Methode gestartet wird und sobald er mehr beginnt, verursacht er FullGC. Daher können Sie die folgende Annotationsmethode ändern, um manuell zu starten und dort zu schalten, wo Sie sie verwenden.
Methode 2: Klasse konfigurieren (NO XML erforderlich)
@ConfigurationPublic Class ProducerBeanConfig {@Value ("$ {openServices.ons.ProducerBean.ProducerId}") private String producerID; @Value ("$ {openServices.ons.Producerbean.accesskey}") private String AccessKey; @Value ("$ {openServices.ons.Producerbean.secretkey}") private String SecretKey; Privatproduzent produzierterbean; @Value ("$ {openServices.ons.Producerbean.onsaddr}") private String onsaddr; @Bean Public ProducerBean OneProducer () {ProducerBean ProducerBean = New ProducerBean (); Eigenschaften Eigenschaften = neue Eigenschaften (); Properties.SetProperty (PropertyKeyConst.ProcerID, Produzent); Properties.SetProperty (PropertyKeyConst.AccessKey, AccessKey); Properties.SetProperty (PropertyKeyConst.secretkey, SecretKey); Properties.SetProperty (PropertyKeyConst.onsaddr, Onsaddr); processerbean.setProperties (Eigenschaften); Return Producerbean; }}PS: Nach diesem Doppel 11 wurde festgestellt, dass die beiden oben genannten Methoden für große Datenvolumen und Multi-Threading-Situationen nicht sehr geeignet sind und die Leistung sehr schlecht ist, sodass es empfohlen wird, 3 zu verwenden.
Methode 3: (Es ist kein XML erforderlich)
@ComponentPublic Class ProducerBeansingleton {@Value ("$ {OpenServices.Ons.Producerbean.ProducerId}") private String producerID; @Value ("$ {openServices.ons.Producerbean.accesskey}") private String AccessKey; @Value ("$ {openServices.ons.Producerbean.secretkey}") private String SecretKey; @Value ("$ {openServices.ons.Producerbean.onsaddr}") private String onsaddr; privater statischer Produzenthersteller; private statische Klasse Singletonholder {private statische endgültige Produzentinsingleton Instance = New ProducerBeansingleton (); } private ProducerBeansingleton () {} public statische endgültige ProducerBeansingleton getInstance () {return Singletonholder.instance; } @Postconstruct public void init () {// Produzentinstanzkonfigurationsinitialisierungseigenschaften Eigenschaften = neue Eigenschaften (); // Producer ID Properties.setProperty (PropertyKeyConst.Procerid, Produzent); // AccessKey Alibaba Cloud -Authentifizierung, erstellen properties.setProperty (PropertyKeyConst.accessKey, AccessKey); // SecretKey Alibaba Cloud Authentifizierung, erstellen properties.setProperty (PropertyKeyConst.secretkey, SecretKey); // SecretKey Alibaba Cloud Authentifizierung, erstellen properties.setProperty (PropertyKeyConst.secretkey, SecretKey); // Setzen Sie die Zeitlimitzeit senden, Einheit Millisecond Properties.setProperty (PropertyKeyConst.SendmsgTimeoutMillis, "3000"); // Setzen Sie den Namen TCP Access Domain (siehe die öffentliche Cloud -Produktionsumgebung hier als Beispiel). Produzent = Onsfactory.CreateProcer (Eigenschaften); // Bevor Sie eine Nachricht senden, müssen Sie die Startmethode aufrufen, um den Produzenten zu starten, und müssen sie nur einmal an den Produzenten anrufen. Start (). } öffentlicher Produzent getProDucer () {Return Producer; }}Federkonfiguration
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect /u516C/u7F51/u914D/u7F6Eopenservices.ons.producerBean.producerId = pidopenservices.ons.producerBean.accessKey = openservices.ons.producerBean.secretKey = openservices.ons.producerBean.secretKey = OpenServices.ons.Producerbean.onsaddr = Public Network, Hangzhou Public Cloud Production
Methode 1 Liefern Sie den Nachrichtencode:
try {string jsonc = jsonUtils.tojson (elfMessage); Message message = new message (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); SendResult sendResult = produzierer.send (message); if (sendResult!Methode 2 Zustellmeldungscode: (kann alle 1000 -fachen gestartet/geschlossen werden)
proconerbean.start (); try {string jsonc = jsonUtils.tojson (eleenMessage); Message message = new message (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); SendResult sendResult = produzierer.send (message); if (sendResult! Producerbean.Shutdown ();Methode 3: Liefern Sie die Nachricht
try {string jsonc = jsonUtils.tojson (elfMessage); Message message = new message (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); Produzent produzent = processerbeansingleton.getInstance (). GetProducer (); SendResult sendResult = produzierer.send (message); if (sendResult! "+e.getMessage (), e); thread.sleep (1000); // Wenn es eine Ausnahme gibt, schlafen Sie 1 Sekunde lang}Der Code, der die Nachricht sendet, muss die Ausnahme abrufen, sonst wird er wiederholt gesendet.
Das Thema hier wird selbst erstellt. ElfMessage ist der inhaltlich zugesandte Inhalt. Ich bin das Objekt, das ich von mir selbst erstellt habe.
3. Verbraucher
Konfigurieren Sie die Startklasse:
@Configuration@ConditionalonProperty (value = "ConsumerConfig.Enabled", hat Value = "true", matchiFming = true) Public Class ConsumerFig {private logger logger = loggerfactory.getLogger (loggerappendertype.smsdist.name ()); @Bean public Consumer ConsumerFactory () {// Verbraucher können hier keine Eigenschaften umbenennen, Verbraucherproperties = new Properties (); ConsumerProperties.setProperty (PropertyKeyConst.Consumerid, mqconfig.consumer_id); ConsumerProperties.setProperty (PropertyKeyConst.accesskey, Mqconfig.access_key); ConsumerProperties.setProperty (PropertyKeyConst.secretkey, mqconfig.secret_key); //consumerProperties.setProperty(PropertyKeyConst.Consumetheadnums, mqconfig.thread_num); ConsumerProperties.setProperty (PropertyKeyConst.onsaddr, Mqconfig.onsaddr); Consumer Consumer = ONSFactory.CreateConsumer (ConsumerProperties); Consumer. // neuer entsprechender Hörer Consumer.start (); logger.info ("ConsumerConfig Start Erfolg"); Verbraucher zurück; }}Sie müssen die richtige CID und ONSADDR auswählen. Sie können es hier konfigurieren, indem Sie Ihre eigene, Verbraucher -Thread -Anzahl usw. verwenden.
Erstellen Sie eine Message -Listener -Klasse und verbrauchen Sie Nachrichten:
@ComponentPublic Class Messagelistener implementiert Messagelistener {private logger logger = loggerfactory.getLogger ("erinnern"); geschützte statische elfreposit elfreposit; @Resource public void setElevenreposit (eleReposit elfreposit) {Messagelistener .ElevenReposit = elfreposit; } @Override öffentlicher Aktionsverbrauch (Nachrichtennachricht, ConsumerConText ConsumContext) {if (message.getttopic (). Equals ("eigenes Thema") {// Vermeiden Sie es, andere Nachrichten zu konsumieren. String res = neuer String (Körper); // res ist der vom Produzent gesendete Nachrichteninhalt // Geschäftscode} else {logger.warn ("!"); }} catch (Ausnahme e) {logger.Error ("Messagelistener.Consume -Fehler:" + e.getMessage (), e); } logger.info ("messagelistener.receive message"); // Wenn Sie die Funktion des Nachrichtenumbaues testen möchten, können Sie die Aktion ersetzen. } else {logger.warn (); Return Action.Reconsumelater; }}Beachten Sie, dass die Verbraucher, da das Objekt mit mehreren Threaded sind, static+eingestellt werden muss, um die Objektpegel auf den Prozess zu erhöhen, damit mehrere Threads geteilt werden können, die Methoden und Variablen der übergeordneten Klasse jedoch nicht aufgerufen werden können.
Der Verbraucherstatus kann überprüfen, ob der Verbraucher erfolgreich angeschlossen ist, ob sich der Verbrauch verzögert, Verbrauchsgeschwindigkeit usw.
Durch das Zurücksetzen der Verbrauchsseite kann alle Nachrichten gelöscht werden
3.. Dinge zu beachten
1. Die gesendete maximale Nachricht ist 256 kb
2. Die Nachricht existiert bis zu 3 Tage
3. Die Standardzahl der Threads auf der Verbraucherseite beträgt 20
V.
5. Wenn lokale Tests oder Starts einsAddr durch ein öffentliches Netzwerk ersetzen, wird der Fehler nicht gestartet.
Das obige ist der gesamte Inhalt dieses Artikels. Ich hoffe, es wird für das Lernen aller hilfreich sein und ich hoffe, jeder wird Wulin.com mehr unterstützen.