1. Site Web officiel du cloud Alibaba --- Aidez Document
https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.wwtiuh
Suivez les étapes du site officiel pour créer un sujet, postulez pour la publication (producteur) et postulez pour abonnement (consommateur)
2. Code
1. Configuration:
classe publique MQConfig {/ ** * Veuillez remplacer le XXX suivant avant de démarrer le test * / public static final String public_topic = "test"; // public static static public_produner_id = "pid_scheduler"; Public Static Final String public_consumer_id = "cid_service"; public static final String Access_Key = "123"; public static final String Secret_Key = "123"; Public Static Final String Tag = ""; Public Static Final String Thread_num = "25"; // Nombre de threads de consommation / ** * ONSADDR Veuillez configurer en fonction de différentes régions * Test de réseau public: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet * Production publique Cloud: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4Client-Internal * Hangzhou Financial Cloud: http://jbponsaddr4client-interal *.com:8080/Rocketmq/nsaddr4client-internal * Shenzhen Financial Cloud: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal * / public static final string onsaddr = "http://onsaddr4client-internal";}Le cloud d'Onsaddr Alibaba utilise la production de cloud public et les tests utilisent le réseau public
Différents services peuvent définir différentes balises, mais si le volume de messages est important, il est recommandé de créer un nouveau sujet.
2. Producteur
Méthode 1:
Fichier de configuration: producteur.xml
<? xml version = "1.0" Encoding = "UTF-8"?> <! Doctype Beans public "- // printemps // dtd bean // en" "http://www.springframework.org/dtd/spring-beans.dtd"> <Beans> <Bean Id = "producteur" Init-Method = "start" name = "Properties"> <map> <entrée key = "producerid" value = "" /> <! - pid, veuillez remplacer -> <entrée key = "accessKey" value = "" /> <! - Access_key, veuillez remplacer -> <entrée clé = "SecretKey" http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4Client-Internet Public Cloud Production: http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal hangzhou Financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4Client-Internal Shenzhen Financial Cloud: http://mq4finance-sz.addr.aliyun.com:8080/rockingmq/nsaddr4client-internal -> <entrée key = "onSaddr" Value = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-intern" /> </ map> </ propriété> </ bean> </ beans>
Méthode de démarrage 1, définie dans le cadre global de la classe:
// Initialisez le producteur Private ApplicationContext CTX; producteur privé producteur; @Value ("$ {productonfig.enabled}") // commutateur, élément de configuration de ressort, true est sur, false redonne booléenne privée productonfigerableabled; @PostConstruct public void init () {if (true == produConFiGenabled) {ctx = new classPathxmlApplicationContext ("producer.xml"); producteur = (producerbean) ctx.getBean ("producteur"); }}PS: J'ai récemment découvert une fosse. Si le producteur est démarré dans la méthode ci-dessus, une fois qu'il commencera davantage, il provoquera FullGC. Par conséquent, vous pouvez passer à la méthode d'annotation suivante pour démarrer manuellement et arrêter où vous l'utilisez.
Méthode 2: Configurer la classe (aucun XML requis)
@ConfigurationPublic Class producerBeanConfig {@Value ("$ {openservices.ons.producerbean.producerid}") Producerid de chaîne privée; @Value ("$ {openservices.ons.producerbean.accessKey}") Private String AccessKey; @Value ("$ {openservices.ons.producerbean.secretKey}") private String SecretKey; Producteur privé Bodice-BreeBean; @Value ("$ {openservices.ons.producerbean.onsaddr}") String privé onSaddr; @Bean public producerbean oneproduner () {producerbean produnerbean = new produnerBean (); Propriétés Properties = New Properties (); Properties.SetProperty (PropertyKeyConst.Producerid, producerid); Properties.SetProperty (PropertyKeyConst.AccessKey, AccessKey); Properties.SetProperty (PropertyKeyConst.SecretKey, SecretKey); Properties.SetProperty (propriétéKeyConst.onsaddr, ONSADDR); producerbean.setproperties (propriétés); retour productorbean; }}PS: Après ce double 11, il a été constaté que les deux méthodes ci-dessus ne sont pas très appropriées pour le volume de données important et les situations multiples, et les performances sont très médiocres, il est donc recommandé d'utiliser 3.
Méthode 3: (Aucun XML n'est requis)
@ComponentPublic Class produceBeanSingleton {@Value ("$ {openservices.ons.producerbean.producerid}") Producerid de chaîne privée; @Value ("$ {openservices.ons.producerbean.accessKey}") Private String AccessKey; @Value ("$ {openservices.ons.producerbean.secretKey}") private String SecretKey; @Value ("$ {openservices.ons.producerbean.onsaddr}") String privé onSaddr; Producteur statique privé; classe statique privée singletonholder {private static final producerBeanSingleton instance = new produnerBeanSingleton (); } Private ProducerBeanSingleton () {} public static final produnerBeansingleton getInstance () {return singletonholder.instance; } @PostConstruct Public void init () {// Proprier Configuration Initialisation Properties Properties = New Properties (); // producteur ID Properties.SetProperty (propriétéKeyConst.Producerid, producerid); // AccessKey Alibaba Cloud Authentication, Create Properties.SetProperty (PropertyKeyConst.AccessKey, AccessKey); // Secretkey Alibaba Cloud Authentication, Create Properties.SetProperty (PropertyKeyConst.SecretKey, SecretKey); // Secretkey Alibaba Cloud Authentication, Create Properties.SetProperty (PropertyKeyConst.SecretKey, SecretKey); // Définissez l'heure du délai d'envoi, unité Millisecond Properties.SetProperty (propriétéKeyConst.SendmsGtimeoutMillis, "3000"); // Définissez le nom de domaine d'accès TCP (voir l'environnement de production du cloud public comme exemple ici) Properties.SetProperty (propriétéKeyConst.onsaddr, ONSADDR); producteur = onsfactory.CreateProducer (propriétés); // Avant d'envoyer un message, vous devez appeler la méthode de démarrage pour démarrer le producteur, et vous n'avez besoin de l'appeler qu'une seule fois à producteur.start (); } producteur public getProducer () {return producteur; }}Configuration de ressort
Spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.mysql5dialectConsumerConFig.enabled = trueProducerConFig.enable /U516C/U7F51/U914D/U7F6EOpenServices.ons.Producerbean.Producerid = PidopenServices.ons.Producerbean.AccessKey = OpenServices.ons.Producerbean.Secretkey = OpenServices.ons.producerbean.Secretkey = openservices.ons.producerbean.onsaddr = réseau public, production publique de cloud hangzhou
Méthode 1 Livrer le code de message:
essayez {String JSONC = JSONUTILS.TOJSON (ElevenMessage); Message message = nouveau message (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); SendResult sendResult = producer.send (message); if (sendResult! = null) {logger.info (". Envoyer le message du message mq!";} else {logger.warn (". SendResult est null ......");}} catch (exception e) {logger.warn ("DoubleevenallPreservice"); thread.sleep (1000); // il y a une exception, sommeil pour 1 seconde}}Méthode 2 Code de message de livraison: (peut être démarré / fermé toutes les 1000 fois)
producerbean.start (); try {string jsonc = jsonUtils.tojson (elevenMessage); Message message = nouveau message (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); SendResult sendResult = producer.send (message); if (sendResult! = null) {logger.info (". Envoyer le succès du message mq!";} else {logger.warn (". SendResult est null ......");}} Catch (exception e) {logger.warn ("DoubleeVevenallPreservice"); thread.sleep (1000); // y a une exception, sommeil, pour 1 seconde} productor.Méthode 3: livrer le message
essayez {String JSONC = JSONUTILS.TOJSON (ElevenMessage); Message message = nouveau message (mqconfig.topic, mqconfig.tag, jsonc.getBytes ()); Producteur producteur = producerBeanSingleton.getInstance (). GetProducer (); SendResult sendResult = producer.send (message); if (sendResult! = null) {logger.info ("DoubleelevenMidService.Send MQ Success! "+ e.getMessage (), e); thread.sleep (1000); // s'il y a une exception, dormez 1 seconde}Le code qui envoie le message doit attraper l'exception, sinon il sera envoyé à plusieurs reprises.
Le sujet ici est créé par vous-même. Elevenmessage est le contenu à envoyer. Je suis l'objet que j'ai créé par moi-même.
3. Consommations
Configurer la classe de démarrage:
@ Configuration @ conditionalonProperty (value = "ConsumerConfig.enabled", haveValue = "true", matchIFMissing = true) Class public ConsumerConfig {private logger logger = loggerFactory.getLogger (loggerAureType.SmgerDist.Name ()); @Bean public Consumer ConsumerFactory () {// Différents consommateurs ne peuvent pas renommer les propriétés ici ConsumerProperties = new Properties (); ConsumerProperties.SetProperty (PropertyKeyConst.ConsumeRid, mqconfig.consumer_id); ConsumerProperties.SetProperty (PropertyKeyConst.AccessKey, MQConfig.Access_Key); ConsumerProperties.SetProperty (PropertyKeyConst.SecretKey, MQConfig.Secret_key); //consumerproperties.setproperty(propertykeyconst.consumethreadnums,mqconfig.thread_num); ConsumerProperties.SetProperty (propriétéKeyConst.onsaddr, mqconfig.onsaddr); Consumer Consumer = OnsFactory.CreateConsumer (ConsumerProperties); Consumer.Subscribe (MQConfig.Topic, MQConfig.tag, New DoubleeleVevenMessageListener ()); // Nouveau auditeur correspondant Consumer.Start (); Logger.info ("ConsumerConfig Start Success."); renvoyer le consommateur; }}Vous devez choisir le bon CID et ONSADDR. Vous pouvez le configurer ici en utilisant votre propre nombre de fils de consommation, etc.
Créez une classe d'écoute de messages et consommez des messages:
@ComponentPublic class MessageListener implémente MessageListener {private logger logger = loggerfactory.getLogger ("remind"); STATIQUE STATIQUE protégé ElevenReposit; @Resource public void SetElevenReposit (ElevenReposit ElevenReposit) {MessageListener .ElevenReposit = ElevenReposit; } @Override Public Action Consommation (message Message, ConsumerConText ConsuNonText) {if (message.getTopic (). Equals ("Propre Topic")) {// Évitez de consommer d'autres messages des erreurs de conversion JSON essaiment {byte [] body = message.getBody (); String res = new String (corps); // res est le contenu du message envoyé par le producteur // code métier} else {logger.warn ("!"); }} catch (exception e) {logger.error ("MessageListener.consume Erreur:" + e.getMessage (), e); } logger.info ("MessageListener.receive Message"); // Si vous souhaitez tester la fonction de republication des messages, vous pouvez remplacer Action.CommitMessage par action.reconsumelater return Action.CommitMessage; } else {logger.warn (); Retour Action.Reconsumelater; }}Notez que puisque les consommateurs sont multipliés, l'objet doit être injecté avec un ensemble statique + pour augmenter le niveau de l'objet au processus, afin que plusieurs threads puissent être partagés, mais les méthodes et variables de la classe parent ne peuvent pas être appelées.
L'état du consommateur peut vérifier si le consommateur est connecté avec succès, si la consommation est retardée, la vitesse de consommation, etc.
La réinitialisation du site de consommation peut effacer tous les messages
3. Choses à noter
1. Le corps de message maximum envoyé est de 256 Ko
2. Le message existe jusqu'à 3 jours
3. Le nombre par défaut de threads du côté consommateur est de 20
4. Si Java raccroche ou que le processeur occupe un montant extrêmement élevé pendant la course, vous pouvez envoyer le fil pour 1s de 1 000 messages lors de l'envoi.
5. Lorsque les tests locaux ou le démarrage, remplacez ONSADDR par un réseau public, sinon l'erreur ne sera pas démarrée.
Ce qui précède est tout le contenu de cet article. J'espère que cela sera utile à l'apprentissage de tous et j'espère que tout le monde soutiendra davantage Wulin.com.