1. موقع وثيقة مساعدة سحابة علي بابا ---
https://help.aliyun.com/document_detail/29536.html؟spm=5176.doc29535.6.555.wwtiuh
اتبع خطوات الموقع الرسمي لإنشاء موضوع ، وتقدم للنشر (منتج) ، وتقدم بطلب للاشتراك (المستهلك)
2. الكود
1. التكوين:
الفئة العامة mqconfig {/** * يرجى استبدال xxx التالي قبل بدء الاختبار */public static final string public_topic = "test" ؛ // public static final string public_producer_id = "pid_scheduler" ؛ Static Final Final String public_consumer_id = "CID_Service" ؛ Static Final String Access_Key = "123" ؛ Static Final String secret_key = "123" ؛ العلامة النهائية الثابتة العامة = "" ؛ Static Final Final Thread_num = "25" ؛ // عدد مؤشرات ترابط المستهلك/*** ONSADDR ، يرجى التكوين وفقًا للمناطق المختلفة* اختبار الشبكة العامة: http://onsaddr- http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * Hangzhou Financial Cloud: http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal * Shenzhen Financial Cloud: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal */السلسلة النهائية الثابتة العامة onsaddr = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client.يستخدم Onsaddr Alibaba Cloud الإنتاج السحابي العام ، وتستخدم الاختبارات الشبكة العامة
يمكن للخدمات المختلفة تعيين علامات مختلفة ، ولكن إذا كان حجم الرسالة كبيرًا ، فمن المستحسن إنشاء موضوع جديد.
2. المنتج
الطريقة 1:
ملف التكوين: producer.xml
<؟ name = "properties"> <map> <interpt key = "producerId" value = "" /> <!-pid ، يرجى استبدال-> <interpt key = "accessKey" value = "" /> <!-Access_Key ، يرجى استبدال-> <إدخال المفتاح = "SecretKey" value = "" /> <! http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet الإنتاج السحابي العام: http://onsaddr- http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal Shenzhen Financial Cloud: http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsddr4client- value = "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"/> </map> </purstary> </ban> </bans>
طريقة بدء التشغيل 1 ، المحددة في الإعداد العالمي للفئة:
// تهيئة المنتج الخاص ApplicationContext CTX ؛ منتج المنتج الخاص ؛ value ("$ {propererconfig.enabled}") // switch ، عنصر تكوين الربيع ، true قيد التشغيل ، إيقاف تشغيل producerfigenfigenabled ؛ postConstruct public void init () {if (true == producronfigenabled) {ctx = new classPathxMlActionContext ("producer.xml") ؛ Producer = (ProducerBean) ctx.getBean ("منتج") ؛ }}ملاحظة: لقد اكتشفت مؤخرًا حفرة. إذا تم بدء المنتج في الطريقة أعلاه ، بمجرد أن يبدأ أكثر ، فإنه سيؤدي إلى FullGC. لذلك ، يمكنك التغيير إلى طريقة التعليقات التوضيحية التالية للبدء يدويًا وإغلاقها عند استخدامها.
الطريقة 2: تكوين فئة (لا مطلوب XML)
ConfigurationPublic Class ProducerBeanConfig {value ("$ {openServices.ons.producerBean.producerId}") Private String ProducerId ؛ Value ("$ {openServices.ons.producerBean.accesskey}") private string accessKey ؛ Value ("$ {openServices.ons.producerBean.secretkey}") SecretKey سلسلة خاصة ؛ منتج المنتج الخاص ؛ Value ("$ {openServices.ons.producerBean.onsaddr}") سلسلة خاصة onsaddr ؛ Bean Public ProducerBean OneProducer () {ProducerBean ProducerBean = New ProducerBean () ؛ خصائص الخصائص = خصائص جديدة () ؛ Properties.SetProperty (PropertyKeyConst.ProducerId ، ProducerId) ؛ Properties.SetProperty (PropertyKeyConst.Accesskey ، AccessKey) ؛ Properties.SetProperty (PropertyKeyConst.Secretkey ، SecretKey) ؛ Properties.SetProperty (PropertyKeyConst.onsaddr ، onsaddr) ؛ ProducerBean.setProperties (الخصائص) ؛ عودة المنتج }}PS: بعد هذا المزدوج 11 ، وجد أن الطريقتين أعلاه غير مناسبان للغاية لحجم البيانات الكبير والمواقف المتعددة الخيوط ، والأداء ضعيف للغاية ، لذلك يوصى باستخدام 3.
الطريقة 3: (لا يلزم XML)
@componentpublic class producerBeansingleton {value ("$ {openServices.ons.producerBean.producerId}") producerId producerId ؛ Value ("$ {openServices.ons.producerBean.accesskey}") private string accessKey ؛ Value ("$ {openServices.ons.producerBean.secretkey}") SecretKey سلسلة خاصة ؛ Value ("$ {openServices.ons.producerBean.onsaddr}") سلسلة خاصة onsaddr ؛ منتج منتج ثابت خاص ؛ Private Static Class Singletonholder {Private Static Final ProducerBeansingleton مثال = جديد ProducerBeansingleton () ؛ } ProducerBeansingleton () {} producerBeansingleton GetInstance () {return singletonholder.instance ؛ } postConstruct public void init () {// Producer مثيل تكوين خصائص خصائص تهيئة تكوين = خصائص جديدة () ؛ // Producer ID Properties.setProperty (PropertyKeyConst.producerId ، ProducerId) ؛ // Accesskey Alibaba Cloud Authentication ، إنشاء خصائص. // secretkey alibaba cloud مصادقة ، إنشاء خصائص. // secretkey alibaba cloud مصادقة ، إنشاء خصائص. . // قم بتعيين اسم مجال الوصول إلى TCP (انظر بيئة الإنتاج السحابي العام كمثال هنا) Properties.SetProperty (PropertyKeyConst.onsaddr ، onsaddr) ؛ المنتج = onsfactory.createproducer (الخصائص) ؛ // قبل إرسال رسالة ، يجب عليك الاتصال بطريقة البدء لبدء المنتج ، وتحتاج فقط إلى الاتصال بها مرة واحدة إلى Producer.start () ؛ } المنتج العام getProducer () {Return Producer ؛ }}تكوين الربيع
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.mysql5dialectConsumerConfig.Enabled = trueproducerconfig.enabled = true #method 1: endabling. /U516C/U7F51/U914D/U7F6EOPENSERVICES.ONS.ProducerBean.producerId = pidopenservices.ons.producerbean.accesskey = openServices.ons.producerBean.secretkey = openServices.Ons.productke OpenServices.ons.producerbean.onsaddr = الشبكة العامة ، الإنتاج السحابي العام Hangzhou
الطريقة 1 قم بتسليم رمز الرسالة:
حاول {string jsonc = jsonutils.tojson (ElevenMessage) ؛ رسالة رسالة = رسالة جديدة (mqconfig.topic ، mqconfig.tag ، jsonc.getbytes ()) ؛ sendResult SendResult = producer.send (message) ؛ if (sendResult! = null) {logger.info (". أرسل نجاح رسالة MQ!" ؛} آخر {logger.warn (". sendResult is null ......") ؛}} catch (استثناء e) {logger.warn ("DefouneleNallPreservice") ؛الطريقة 2 رمز رسالة التسليم: (يمكن بدء/إغلاق كل 1000 مرة)
producerBean.start () ؛ حاول {String jsonc = jsonutils.tojson (ElevenMessage) ؛ رسالة رسالة = رسالة جديدة (mqconfig.topic ، mqconfig.tag ، jsonc.getbytes ()) ؛ sendResult SendResult = producer.send (message) ؛ if (sendResult! = null) {logger.info (". أرسل رسالة mq message!" ؛} {logger.warn (". sendResult is null ......") ؛}} catch (استثناء e) {logger.warnالطريقة 3: تسليم الرسالة
حاول {string jsonc = jsonutils.tojson (ElevenMessage) ؛ رسالة رسالة = رسالة جديدة (mqconfig.topic ، mqconfig.tag ، jsonc.getbytes ()) ؛ منتج المنتج = producerBeansingleton.getInstance (). getProducer () ؛ sendResult SendResult = producer.send (message) ؛ if (sendResult! = null) {logger.info ("DefineleVenmidservice.Send MQ Message Success! الموضوع هو:" ؛} آخر {logger.warn ("definelevenmidservice.sendresult is null ......") ؛ "+e.getMessage () ، e) ؛ thread.sleep (1000) ؛ // إذا كان هناك استثناء ، نوم لمدة ثانية}الرمز الذي يرسل الرسالة يجب أن يلتقط الاستثناء ، وإلا سيتم إرساله مرارًا وتكرارًا.
الموضوع هنا تم إنشاؤه بمفردك. Elevenmessage هو المحتوى الذي سيتم إرساله. أنا الشيء الذي أنشأته بنفسي.
3. المستهلكون
تكوين فئة بدء التشغيل:
@configuration@intenalonproperty (value = "consulterconfig.endabled" ، havevalue = "true" ، matchifmissing = true) class public classeReConfig {private logger logger = loggerfactory.getLogger (loggerAppenderType.smsdist.name ()) ؛ Bean Bublic ConsumerFactory () {// لا يمكن للمستهلكين المختلفين إعادة تسمية الخصائص هنا الاستهلاكية = خصائص جديدة () ؛ consumerProperties.setProperty (propertykeyconst.consumerid ، mqconfig.consumer_id) ؛ consumerProperties.setProperty (propertyKeyConst.Accesskey ، mqconfig.access_key) ؛ consumerProperties.setProperty (propertyKeyConst.Secretkey ، mqconfig.secret_key) ؛ //consumerProperties.setProperty(PropertyKeyConst.ConsumethReadNums،mqconfig.thread_num) ؛ ConsumerProperties.setProperty (PropertyKeyConst.onsaddr ، mqconfig.onsaddr) ؛ المستهلك المستهلك = onsfactory.createconsumer (المستهلكين) ؛ المستهلك. // مستمع جديد مقابل المستمع. START () ؛ logger.info ("ConsulterConfig START.") ؛ إرجاع المستهلك ؛ }}تحتاج إلى اختيار CID الصحيح و ONSADDR. يمكنك تكوينه هنا باستخدام عدد مؤشرات ترابط المستهلك الخاص بك ، إلخ ، إلخ.
قم بإنشاء فئة مستمع رسالة واستهلاك الرسائل:
ComponentPublic class messagelistener تنفذ messagelistener {private logger logger = loggerFactory.getLogger ("ذكري") ؛ محمية ثابت واحد من أحد عشر ايليفيوسيت ؛ Resource public void setelevenreposit (ElevenReposit ElevenReposit) {messagelistener .elevenreposit = ElevenReposit ؛ } Override الاستهلاك الإجراء العام (رسالة الرسائل ، الاستهلاك الاستهلاك الاستهلاك) {if (message.getTopic (). يساوي ("موضوع خاص")) {// تجنب استهلاك أخطاء تحويل json الأخرى جرب {byte [] body = message.getBody () ؛ سلسلة الدقة = سلسلة جديدة (الجسم) ؛ // res هو محتوى الرسالة المرسلة بواسطة Producer // Business Code} آخر {logger.warn ("!") ؛ }} catch (استثناء e) {logger.error ("messagelistener.consume error:" + e.getMessage () ، e) ؛ } logger.info ("messagelistener.receive message") ؛ // إذا كنت ترغب في اختبار وظيفة إعادة نشر الرسالة ، فيمكنك استبدال Action.CommitMessage باستخدام Action.Reconsumelater Return Action.CommitMessage ؛ } آخر {logger.warn () ؛ إرجاع العمل. reconsumelater ؛ }}لاحظ أنه نظرًا لأن المستهلكين متعدد الخيوط ، يجب حقن الكائن باستخدام مجموعة ثابتة+لرفع مستوى الكائن إلى العملية ، بحيث يمكن مشاركة مؤشرات الترابط المتعددة ، ولكن لا يمكن استدعاء طرق ومتغيرات الفئة الأصل.
يمكن لحالة المستهلك التحقق مما إذا كان المستهلك متصلاً بنجاح ، ما إذا كان الاستهلاك قد تأخر ، وسرعة الاستهلاك ، إلخ.
يمكن إعادة تعيين موقع الاستهلاك مسح جميع الرسائل
3. أشياء يجب ملاحظتها
1. الحد الأقصى للرسالة المرسلة هو 256 كيلو بايت
2. الرسالة موجودة لمدة تصل إلى 3 أيام
3. العدد الافتراضي للمواضيع على جانب المستهلك هو 20
4. إذا كانت Java معلقة أو تحتل وحدة المعالجة المركزية مبلغًا مرتفعًا للغاية أثناء التشغيل ، فيمكنك إرسال الخيط مقابل 1s من كل 1000 رسالة عند إرسالها.
5. عند الاختبار المحلي أو بدء التشغيل ، استبدل ONSADDR بشبكة عامة ، وإلا فلن يتم بدء الخطأ.
ما سبق هو كل محتوى هذه المقالة. آمل أن يكون ذلك مفيدًا لتعلم الجميع وآمل أن يدعم الجميع wulin.com أكثر.