1. مقدمة
في الآونة الأخيرة ، تحتاج الشركة إلى استخدام قائمة انتظار رسائل Cloud Alibaba. لجعلها أكثر ملاءمة للاستخدام ، قضيت بضعة أيام في تغليف قائمة انتظار الرسائل في طريقة استدعاء API لتسهيل استدعاء النظام الداخلي. لقد تم الانتهاء منه الآن. نحن هنا نسجل العملية والتقنيات ذات الصلة المستخدمة ، ومشاركة معك.
توفر Alibaba Cloud الآن خدمات رسائلين: خدمة MNS وخدمة ONS. أعتقد أن MNS هي نسخة مبسطة من ONS ، ويتطلب استهلاك رسائل MNS استراتيجيات الاقتراع المخصصة. في المقابل ، تكون وظائف وضع النشر والاشتراك في ONS أكثر قوة (على سبيل المثال ، مقارنةً بـ MNS ، ويوفر ORS تتبع الرسائل وتسجيله ومراقبة وظائف أخرى) ، وواجهة برمجة التطبيقات الخاصة بها أكثر ملاءمة للاستخدام. وقد سمع أيضًا أن علي بابا لن يطور MNS في المستقبل ، ولكنه يحافظ عليه فقط. ستقوم خدمة ONS باستبدال خدمة MNS تدريجياً وتصبح المنتج الرئيسي لخدمة رسائل Alibaba. لذلك ، إذا كانت هناك حاجة إلى استخدام قوائم قوائم الرسائل ، فمن المستحسن عدم استخدام MNS مرة أخرى. استخدام ONS هو الخيار الأفضل.
التقنيات المعنية: الربيع ، الانعكاس ، الوكيل الديناميكي ، التسلسل التسلسلي جاكسون وإلغاء التسلسل
قبل قراءة المقالة التالية ، تحتاج إلى قراءة الوثائق أعلاه لفهم المفاهيم ذات الصلة (الموضوع ، المستهلك ، المنتج ، العلامة ، إلخ) وتطبيقات الإرسال والرمز البسيطة المقدمة في الوثائق.
منشور المدونة هذا مخصص فقط للأصدقاء الذين لديهم قاعدة معرفة في قوائم قوائم الرسائل. أنا سعيد جدًا بمساعدة الجميع. لا توبخ أي شخص لا يفهمها ، لأنه يعني أن طريقك خاطئ.
2. خطة التصميم
1. إرسال الرسالة
في بنية CSS البسيطة ، على افتراض أن الخادم سيستمع إلى الرسالة المرسلة من قبل منتج موضوع ، يجب أولاً توفير واجهة برمجة تطبيقات العميل. يحتاج العميل فقط إلى استدعاء واجهة برمجة التطبيقات ويمكنه إنتاج رسائل من خلال المنتج.
2. استقبال الرسائل
نظرًا لصياغة API بواسطة الخادم ، فإن الخادم بالطبع يعرف أيضًا كيفية استهلاك هذه الرسائل.
في هذه العملية ، يلعب الخادم فعليًا دور المستهلكين ، ويقوم العميل بتشغيل دور المنتجين فعليًا ، ولكن يتم صياغة قواعد المنتجين لإنتاج الرسائل لتلبية احتياجات استهلاك المستهلك.
3. الهدف النهائي
نريد إنشاء حزمة JAR منفصلة تسمى قائمة الانتظار لتوفير تطبيقات محددة من التبعيات ونشر الاشتراكات للمنتجين والمستهلكين.
3. إرسال الرسالة
1. يقدم المستهلكون واجهات
topic (name = "kdyzm" ، producerId = "kdyzm_producer") الواجهة العامة userqueueresource {tag ( tag ("test2") public void GeneorUserInfo1 ( @body @key ("userInfoHandler1") usermodel user) ؛}نظرًا لأن الموضوع والمنتج في علاقة N: 1 ، يتم استخدام ProducerId مباشرة كخاصية للموضوع ؛ العلامة هي حالة تصفية حاسمة للغاية ، ويستخدمها المستهلكون لتصنيف الرسائل لأداء معالجة أعمال مختلفة ، لذلك يتم استخدام العلامة كشرط توجيه هنا.
2. المنتج يرسل رسائل باستخدام واجهة برمجة التطبيقات التي يوفرها المستهلك
نظرًا لأن المستهلكين يوفرون فقط واجهات للمنتجين لاستخدامها ، فلا توجد وسيلة لاستخدام واجهات مباشرة لأنه لا توجد وسيلة لإنشاء مثيل لها. هنا نستخدم الوكيل الديناميكي لإنشاء كائنات. في واجهة برمجة التطبيقات التي يوفرها المستهلكون ، أضف التكوين التالي لتسهيل المنتجين لاستيراد التكوين واستخدامه مباشرة. هنا نستخدم Config Spring بناءً على Java. من فضلك أعلم.
ConfigurationPublic Class QueUeConfig {autowiredbean public userqueeresource userqueueresource () {return queueresourcefactory.createproxyqueueresource (userqueueresource.class) ؛ }}3. تغليف قائمة انتظار لإرسال رسالة المنتج
يجب تحديد جميع التعليقات التوضيحية في 1 أعلاه (الموضوع ، العلامة ، الجسم ، المفتاح) وفئات QueueresourceFactory المستخدمة في 2 في قائمة الانتظار. تعريف التعليق التوضيحي يحدد القواعد فقط. التنفيذ الحقيقي هو في الواقع في QueueresourceFactory.
استيراد java.lang.reflect.invocationHandler ؛ استيراد java.lang.reflect.method ؛ استيراد java.lang.reflect.proxy ؛ استيراد org.slf4j.logger com.aliyun.openservices.ons.api.producer ؛ استيراد com.aliyun.openservices.ons.api.sendresult ؛ استيراد com.wy.queue.core.api.mqconnection ؛ استيراد com.wy.quore.Utils.jacksonserializer ؛ import com.wy.queue.core.utils.queuecorespringUtils ؛ الطبقة العامة QueUeresourceFactory تنفذ invocationHandler {private static final logger = loggerfactory.getLogger (queueresourcefactory.class) ؛ اسم السلسلة الخاصة. سلسل خاص المنتج. private jacksonserializer serializer = new JackSonserializer () ؛ بادئة السلسلة النهائية الثابتة الخاصة = "PID_" ؛ public QueUeresourceFactory (string topicname ، string producerId) {this.topicName = topicName ؛ this.producerId = producerId ؛ } static public <T> t createproxyqueueresource (class <T> clazz) {String TopicName = mqutils.getTopicName (clazz) ؛ String ProducerId = mqutils.getProducerId (clazz) ؛ t target = (t) proxy.newproxyinstance (queueresourcefactory.class.getClassloader () ، فئة جديدة <؟> [] {clazz} ، QueueresourceFactory (thisionName ، producerId)) ؛ الهدف الإرجاع ؛ } Override الكائن العام invoke (وكيل الكائن ، طريقة الطريقة ، الكائن [] args) يلقي رمي {if (args.length == 0 || args.length> 1) {رمي new runTimeException ("فقط قبول param في واجهة Queueresource.") ؛ } string tagName = mqutils.getTagName (method) ؛ ProducerFactory ProducerFactory = QueUecorSpringUtils.getBean (ProducerFactory.class) ؛ mqconnection connectionInInfo = queuecorespringutils.getBean (mqconnection.class) ؛ منتج المنتج = producerfactory.createproducer (prefix+connectionInfo.getPrefix ()+"_"+producerId) ؛ // إرسال رسالة رسالة msg = رسالة جديدة (// // الموضوع الذي تم إنشاؤه في وحدة التحكم ، أي اسم الموضوع الذي تنتمي إليه الرسالة. connectionInInfo.getPrefix ()+"_"+topicnam الشكل الثنائي للبيانات ، لا يتداخل MQ مع أي ، // المنتج والمستهلك مطلوب التفاوض بشأن تسلسل تسلسلي وسلسل تسلسلي. SendResult SendResult = producer.send (msg) ؛ logger.info ("إرسال نجاح الرسالة. معرف الرسالة هو:" + sendResult.getMessageId ()) ؛ العودة لاغية. }}هنا نشرنا بشكل خاص الحزمة المخصصة وأسماء الحزم المستخدمة من قبل أطراف ثالثة لتسهيل التمييز.
ما الذي يتم فعله بالضبط هنا؟
تتمثل عملية إرسال رسالة في إنشاء كائن وكيل على الوكيل الديناميكي. سيتم اعتراض الكائن عند استدعاء الطريقة. أولاً ، تحليل جميع التعليقات التوضيحية ، مثل اسم TopicName و ProducerId و TAG وغيرها من المعلومات الرئيسية من التعليقات التوضيحية ، ثم اتصل بـ Alibaba SDK لإرسال الرسالة. العملية بسيطة للغاية ، ولكن لاحظ أنه عند إرسال الرسائل هنا ، يتم تقسيمها إلى بيئات. بشكل عام ، تميز المؤسسة الآن ثلاث بيئات: ضمان الجودة ، التدريج ، والمنتج. من بينها ، ضمان الجودة والتدريج هي بيئات اختبار. لقوائم الرسائل ، هناك أيضًا ثلاث حلقات. ومع ذلك ، في البيئة ، غالبًا ما تستخدم بيئات ضمان الجودة والانتقال نفس حساب Alibaba لتقليل التكاليف ، وبالتالي سيتم وضع الموضوع الذي تم إنشاؤه و ProductID في نفس المنطقة. وبهذه الطريقة ، لا يُسمح بوجود اسم Toffice With نفس الاسم ، لذلك تتم إضافة بادئة البيئة لتمييزها ، مثل QA_TopicName ، PID_STAGING_PRODUCERID ، إلخ ؛ بالإضافة إلى ذلك ، يوفر قائمة الانتظار واجهة MQConnection للحصول على معلومات التكوين ، وتحتاج خدمات المنتج فقط إلى تنفيذ هذه الواجهة.
4. المنتج يرسل الرسائل
AUTOWIRED userqueueresource userqueueresource ؛ Override public void sendMessage () {UserModel usermodel = new UserModel () ؛ usermodel.setName ("kdyzm") ؛ usermodel.setage (25) ؛ userqueueresource.handleuserInfo (UserModel) ؛ }هناك حاجة إلى بضعة أسطر من التعليمات البرمجية فقط لإرسال الرسالة إلى الموضوع المحدد ، وهو أرق بكثير من رمز الإرسال الأصلي.
4. استهلاك الأخبار
بالمقارنة مع إرسال الرسائل ، فإن استهلاك الرسائل أكثر تعقيدًا.
1. تصميم استهلاك الرسائل
نظرًا لأن الموضوع والمستهلك علاقة n: n ، يتم وضع المستهلك على طريقة التنفيذ المحددة للمستهلك
@controller@queueresourcepublic class userqueueresourceImpl تنفذ userqueeresource {private logger logger = loggerFactory.getLogger (this.getClass ()) ؛ OverRideConsumerAnnotation ("kdyzm_consumer") public void GeneftUserInfo (usermodel user) {logger.info ("message 1 receed: {}" ، new gson (). tojson (user)) ؛ } OverRideConsumerAnnotation ("Kdyzm_Consumer1") public void GeneftUserInfo1 (usermodel user) {logger.info ("message 2 receed: {}" ، new gson (). tojson (user)) ؛ }}فيما يلي اثنين من التعليقات التوضيحية الجديدة Queueresource و ConsumerAnnotation. سيتم مناقشة هذين التعليقات التعليقات التوضيحية في المستقبل. قد يسألني شخص ما لماذا يجب أن أستخدم اسم المستهلك بدلاً من اسم المستهلك ، لأن اسم المستهلك يتضاعف مع الاسم في SDK المقدمة من Aliyun. . . .
هنا ، يوفر المستهلكون واجهة API للمنتجين لتسهيل المنتجين لإرسال الرسائل ، وتنفيذ المستهلكين الواجهة لاستهلاك الرسائل التي يرسلها المنتجون. كيفية تنفيذ واجهة API هي تنفيذ المراقبة ، وهو منطق حاسم نسبيًا.
2. أكور النواة ينفذ المنطق الأساسي للاستماع إلى قائمة انتظار الرسائل
الخطوة 1: استخدم طريقة الاستماع لحاوية الزنبرك للحصول على جميع الفاصوليا مع تعليقات Queueresource
الخطوة 2: توزيع حبوب المعالجة
كيف تتعامل مع هذه الفاصوليا؟ كل فول هو في الواقع كائن. باستخدام كائن ، مثل كائن userqueueresourceImpl في المثال أعلاه ، يمكننا الحصول على كائن Bytecode الواجهة التي يتم تنفيذها بواسطة الكائن ، ثم الحصول على التعليقات التوضيحية على واجهة userqueuerourourourous والتعليقات التوضيحية على الأساليب والطرق. بالطبع ، يمكن أيضًا الحصول على التعليقات التوضيحية على طريقة تنفيذ userqueueresourceImpl. هنا سأستخدم المستهلك كمفتاح ، ويتم تغليف المعلومات المتبقية ذات الصلة كقيمة ومخزنة في كائن MAP. الرمز الأساسي هو كما يلي:
class <؟> clazz = ResourceImpl.getClass () ؛ class <؟> clazzif = clazz.getInterfaces () [0] ؛ الطريقة [] الأساليب = clazz.getMethods () ؛ string topicname = mqutils.getTopicName (clazzif) ؛ لـ (method m: methods) {consumerannotation consumeranno = m.getAnnotation (consumerannotation.class) ؛ if (null == consumeranno) {// logger.error ("method = {} تحتاج إلى تعليق توضيحي للمستهلك." ، m.getName ()) ؛ يكمل؛ } string consumerId = consumeranno.value () ؛ if (stringUtils.isempty (consuerid)) {logger.error ("method = {} لا يمكن أن يكون المستهلك فارغًا" ، m.getName ()) ؛ يكمل؛ } class <؟> [] parametertypes = m.getParameterTypes () ؛ طريقة ResourceIfMethod = null ؛ حاول {ResourceIfMethod = clazzif.getMethod (m.getName () ، parametertypes) ؛ } catch (nosuchmethodException | securityException e) {logger.error ("لا يمكن العثور على method = {} في Super Interface = {}." ، m.getName () ، clazzif.getCanonicalName () ، e) ؛ يكمل؛ } string tagName = mqutils.getTagName (resoundifmethod) ؛ consumersmap.put (consuerid ، new MethodInfo (TopicName ، tagName ، M)) ؛ }الخطوة 3: إجراءات الاستهلاك من خلال التفكير
أولاً ، حدد توقيت تنفيذ إجراء الانعكاس ، أي استمع إلى رسائل جديدة
ثانياً ، كيف تنفذ إجراءات التفكير؟ لن أخوض في التفاصيل. أحذية الأطفال ذات الأسس المتعلقة بالانعكاس تعرف كيفية صنعها. الرمز الأساسي هو كما يلي:
mqconnection connectionInInfo = queuecorespringutils.getBean (mqconnection.class) ؛ String TopicPrefix = connectionInfo.getPrefix ()+"_" ؛ String ConsumerPrefix = prefix+connectionInFo.getPrefix ()+"_" ؛ لـ (String consumerId: consumersmap.keyset ()) {methodInfo methodInfo = consumersMap.get (consumerId) ؛ Properties ConnectionProperties = convertToProperties (connectionInfo) ؛ // معرف المستهلك الذي قمت بإنشائه في Console ConnectionProperties.put (propertykeyconst.consumerid ، المستهلكينبيكس+المستهلك) ؛ المستهلك المستهلك = onsfactory.createConsumer (ConnectionProperties) ؛ consumer.subscribe (topicprefix+methodInfo.getTopicName () ، methodInfo.getTagName () ، new messagelistener () {// الاشتراك في استهلاك الإجراءات العامة المتعددة (message ، utf-uppger. topic = {} ، tag = {} ، consumerId = {} ، message = {} "، topicPrefix+methodInfo.getTopicName () ، methodInfo.getTagName () ، classeDPrefix+consumerid ، messageBody) ؛ methodInfo.getMethod () ؛ method.getparameter () [0] ؛ المستهلك. بدأ logger.info ("المستهلك = {}." ، consumerPrefix+consumerId) ؛ }5. انظر رابط GIT أدناه للحصول على الرمز الكامل
https://github.com/kdyzm/queue-core.git
ما سبق هو كل محتوى هذه المقالة. آمل أن يكون ذلك مفيدًا لتعلم الجميع وآمل أن يدعم الجميع wulin.com أكثر.