1. คำนำ
เมื่อเร็ว ๆ นี้ บริษัท มีความจำเป็นที่จะต้องใช้คิวข้อความ Alibaba Cloud เพื่อให้ใช้งานได้ง่ายขึ้นฉันใช้เวลาสองสามวันในการห่อหุ้มคิวข้อความลงในวิธีการเรียก API เพื่ออำนวยความสะดวกในการโทรของระบบภายใน ตอนนี้เสร็จสมบูรณ์แล้ว ที่นี่เราบันทึกกระบวนการและเทคโนโลยีที่เกี่ยวข้องที่ใช้และแบ่งปันกับคุณ
Alibaba Cloud ให้บริการข้อความสองรายการ: บริการ MNS และบริการ ONS ฉันคิดว่า MNS เป็นรุ่นที่ง่ายของ ONS และการใช้ข้อความ MNS ต้องใช้กลยุทธ์การสำรวจที่กำหนดเอง ในทางตรงกันข้ามฟังก์ชั่นการเผยแพร่และการสมัครสมาชิกของ ONS นั้นมีประสิทธิภาพมากกว่า (ตัวอย่างเช่นเมื่อเทียบกับ MNS, ONS ให้การติดตามข้อความการบันทึกการตรวจสอบและฟังก์ชั่นอื่น ๆ ) และ API นั้นสะดวกกว่าที่จะใช้งาน นอกจากนี้ยังมีได้ยินว่าอาลีบาบาจะไม่พัฒนา MNS อีกต่อไปในอนาคต แต่รักษาไว้เท่านั้น บริการ ONS จะค่อยๆแทนที่บริการ MNS และกลายเป็นผลิตภัณฑ์หลักของบริการข้อความของอาลีบาบา ดังนั้นหากมีความจำเป็นที่จะต้องใช้คิวข้อความขอแนะนำให้ไม่ใช้ MNS อีกครั้ง การใช้ ONS เป็นตัวเลือกที่ดีที่สุด
เทคนิคที่เกี่ยวข้อง: ฤดูใบไม้ผลิ, การสะท้อน, พร็อกซีแบบไดนามิก, การทำให้เป็นอนุกรมของแจ็คสันและ deserialization
ก่อนที่จะอ่านบทความต่อไปนี้คุณต้องอ่านเอกสารข้างต้นเพื่อทำความเข้าใจแนวคิดที่เกี่ยวข้อง (หัวข้อผู้บริโภคผู้ผลิตแท็ก ฯลฯ ) และการส่งและรับการใช้งานรหัสอย่างง่ายที่มีให้ในเอกสาร
โพสต์บล็อกนี้มีไว้สำหรับเพื่อนที่มีฐานความรู้เกี่ยวกับคิวข้อความเท่านั้น ฉันมีความสุขมากที่ได้ช่วยเหลือทุกคน อย่าดุใครที่ไม่เข้าใจเพราะหมายความว่าเส้นทางของคุณผิด
2. แผนการออกแบบ
1. การส่งข้อความ
ในสถาปัตยกรรม CSS อย่างง่ายสมมติว่าเซิร์ฟเวอร์จะฟังข้อความที่ส่งโดยผู้ผลิตหัวข้อควรให้ API ไคลเอนต์ก่อน ลูกค้าต้องการเพียงแค่เรียก API และสามารถสร้างข้อความผ่านผู้ผลิต
2. การรับข้อความ
เนื่องจาก API ถูกกำหนดโดยเซิร์ฟเวอร์เซิร์ฟเวอร์ของหลักสูตรจึงรู้วิธีการใช้ข้อความเหล่านี้
ในกระบวนการนี้เซิร์ฟเวอร์มีบทบาทของผู้บริโภคจริง ๆ และลูกค้ามีบทบาทของผู้ผลิตจริง ๆ แต่กฎสำหรับผู้ผลิตในการผลิตข้อความจะถูกกำหนดโดยผู้บริโภคเพื่อตอบสนองความต้องการการบริโภคของผู้บริโภค
3. เป้าหมายสูงสุด
เราต้องการสร้างแพ็คเกจ JAR แยกต่างหากชื่อคิว-คอร์เพื่อให้การใช้งานเฉพาะของการพึ่งพาและเผยแพร่การสมัครสมาชิกสำหรับผู้ผลิตและผู้บริโภค
3. การส่งข้อความ
1. ผู้บริโภคให้อินเทอร์เฟซ
@topic (name = "kdyzm", producerId = "kdyzm_producer") อินเตอร์เฟสสาธารณะ userqueueresource {@tag ("test1") โมฆะสาธารณะที่จับสาธารณะ (@body @key ("userinfohandler" @tag ("test2") โมฆะสาธารณะที่จับต้องไม่ได้เนื่องจากหัวข้อและผู้ผลิตอยู่ในความสัมพันธ์ N: 1 ProducerID จึงถูกใช้โดยตรงเป็นคุณสมบัติของหัวข้อ Tag เป็นเงื่อนไขการกรองที่สำคัญมากและผู้บริโภคใช้เพื่อจำแนกข้อความเพื่อดำเนินการประมวลผลทางธุรกิจที่แตกต่างกันดังนั้น TAG จึงใช้เป็นเงื่อนไขการกำหนดเส้นทางที่นี่
2. ผู้ผลิตส่งข้อความโดยใช้ API ที่จัดทำโดยผู้บริโภค
เนื่องจากผู้บริโภคให้เพียงอินเทอร์เฟซสำหรับผู้ผลิตที่จะใช้จึงไม่มีวิธีใช้อินเทอร์เฟซโดยตรงเพราะไม่มีวิธีในการสร้างอินสแตนซ์ ที่นี่เราใช้พร็อกซีแบบไดนามิกเพื่อสร้างวัตถุ ใน API ที่จัดทำโดยผู้บริโภคให้เพิ่มการกำหนดค่าต่อไปนี้เพื่ออำนวยความสะดวกให้ผู้ผลิตในการนำเข้าการกำหนดค่าโดยตรงและใช้งาน ที่นี่เราใช้ Spring Config ตาม Java โปรดทราบ
@ConfigurationPublic คลาส queUeconfig {@autowired @bean public userqueeresource userQueueresource () {return queueresourcefactory.createProxyqueueresource -3. การห่อหุ้มคิว-คอร์สำหรับการส่งข้อความของผู้ผลิต
คำอธิบายประกอบทั้งหมดใน 1 ด้านบน (หัวข้อ, แท็ก, ร่างกาย, คีย์) และคลาส queueresourcefactory ที่ใช้ใน 2 ต้องกำหนดในคิว-คอร์ คำจำกัดความของคำอธิบายประกอบจะกำหนดกฎเท่านั้น การใช้งานจริงนั้นอยู่ใน QueueresourceFactory
นำเข้า java.lang.reflect.invocationhandler; นำเข้า java.lang.reflect.method; นำเข้า java.lang.reflect.proxy; นำเข้า org.slf4j.logger นำเข้า org.slf4j.loggerfactory; com.aliyun.openservices.ons.api.producer; นำเข้า com.aliyun.openservices.ons.api.sendresult; นำเข้า com.wy.queue.core.api.mqconnection; นำเข้า com.wy. com.wy.queue.core.utils.queuecorespringutils; queueresourcefactory public queueresourcefactory enchocationhandler {ส่วนตัว logger logger สุดท้ายคงที่ = loggerFactory.getLogger (queueresourceFactory.class); หัวข้อสตริงส่วนตัวชื่อ; Private String Producerid; private jacksonserializer serializer = new JackSonserializer (); private Static Final String คำนำหน้า = "pid_"; สาธารณะ queueresourceFactory (String topicName, String ProducerId) {this.topicName = topicName; this.producerid = producerId; } สาธารณะคงที่ <t> t createProxyqueUeresource (คลาส <t> clazz) {สตริงหัวข้อ name = mqutils.getTopicName (clazz); สตริง producerId = mqutils.getProducerId (clazz); t target = (t) proxy.newproxyinstance (queueresourcefactory.class.getclassloader (), คลาสใหม่ <?> [] {clazz}, queueresourcefactory ใหม่ (ชื่อหัวข้อ, ผู้ผลิต)); เป้าหมายกลับ; } @Override Object Public Invoke (Object Proxy, Method Method, Object [] args) โยน throwable {if (args.length == 0 || args.length> 1) {โยน runtimeException ใหม่ ("ยอมรับหนึ่งพารามิเตอร์ที่ queueresource interface"); } String tagname = mqutils.getTagname (เมธอด); ProducerFactory ProducerFactory = QueueCorespringutils.getBean (ProducerFactory.class); MQConnection ConnectionInfo = queUecorespringutils.getBean (mqconnection.class); ผู้ผลิตผู้ผลิต = ProducerFactory.CreateProducer (คำนำหน้า+ConnectionInfo.getPrefix ()+"_"+ProducerId); // ส่งข้อความข้อความ msg = ข้อความใหม่ (// // หัวข้อที่สร้างขึ้นในคอนโซลนั่นคือชื่อหัวข้อที่เป็นของข้อความ connectionInfo.getPrefix ()+"_"+หัวข้อชื่อ // ข้อความข้อความ // มันสามารถเข้าใจได้ รูปแบบไบนารีของข้อมูล MQ ไม่รบกวนใด ๆ // ผู้ผลิตและผู้บริโภคจำเป็นต้องเจรจาต่อรองการทำให้เป็นอนุกรมและวิธีการ deserialization ที่สอดคล้องกัน serializer.serialize (args [0]). getBytes ()); SENDRESULT SENDRESULT = PROKEER.SEND (MSG); logger.info ("ส่งความสำเร็จของข้อความรหัสข้อความคือ:" + sendresult.getMessageId ()); คืนค่า null; -ที่นี่เราได้โพสต์แพ็คเกจที่กำหนดเองเป็นพิเศษและชื่อแพ็คเกจที่ใช้โดยบุคคลที่สามเพื่ออำนวยความสะดวกความแตกต่าง
ทำอะไรที่นี่กันแน่?
กระบวนการส่งข้อความคือการสร้างวัตถุพร็อกซีบนพร็อกซีแบบไดนามิก วัตถุจะถูกสกัดกั้นเมื่อเรียกวิธีการ ก่อนอื่นแยกวิเคราะห์คำอธิบายประกอบทั้งหมดเช่นหัวข้อชื่อผู้ผลิตแท็กและข้อมูลคีย์อื่น ๆ จากคำอธิบายประกอบจากนั้นโทรไปที่อาลีบาบา SDK เพื่อส่งข้อความ กระบวนการนี้ง่ายมาก แต่โปรดทราบว่าเมื่อส่งข้อความที่นี่จะแบ่งออกเป็นสภาพแวดล้อม โดยทั่วไปแล้วองค์กรจะแยกแยะสภาพแวดล้อมสามอย่าง: QA การจัดเตรียมและผลิตภัณฑ์ ในหมู่พวกเขา QA และการจัดเตรียมเป็นสภาพแวดล้อมการทดสอบ สำหรับคิวข้อความมีสามวง อย่างไรก็ตามในสภาพแวดล้อม QA และสภาพแวดล้อมการจัดเตรียมมักใช้บัญชีอาลีบาบาเดียวกันเพื่อลดต้นทุนดังนั้นหัวข้อที่สร้างขึ้นและ ProductID จะถูกวางไว้ในพื้นที่เดียวกัน ด้วยวิธีนี้ชื่อหัวข้อที่มีชื่อเดียวกันไม่ได้รับอนุญาตให้มีอยู่ดังนั้นคำนำหน้าของสภาพแวดล้อมจะถูกเพิ่มเพื่อแยกความแตกต่างเช่น QA_TopicName, PID_STaging_ProducerId ฯลฯ ; นอกจากนี้ Queue-Core ยังมีอินเทอร์เฟซ MQConnection เพื่อรับข้อมูลการกำหนดค่าและบริการผู้ผลิตจำเป็นต้องใช้อินเทอร์เฟซนี้เท่านั้น
4. ผู้ผลิตส่งข้อความ
@AutoWired UserqueUerEsource UserQueueresource; @Override โมฆะสาธารณะ SendMessage () {USERMODEL USERMODEL = ใหม่ USERMODEL (); usermodel.setName ("kdyzm"); USERMODEL.SETAGE (25); userqueueresource.handleuserinfo (USERMODEL); -จำเป็นต้องใช้รหัสเพียงไม่กี่บรรทัดในการส่งข้อความไปยังหัวข้อที่ระบุซึ่งบางกว่ารหัสการส่งแบบดั้งเดิม
4. การบริโภคข่าว
เมื่อเทียบกับการส่งข้อความการใช้ข้อความมีความซับซ้อนมากขึ้น
1. การออกแบบการบริโภคข้อความ
เนื่องจากหัวข้อและผู้บริโภคเป็นความสัมพันธ์ n: n ผู้บริโภคจึงถูกวางไว้บนวิธีการใช้งานเฉพาะของผู้บริโภค
@controller@queueresourcepublic คลาส userqueueresourceimpl ใช้ userqueueresource {private logger logger = loggerFactory.getLogger (this.getClass ()); @Override @consumerannotation ("KDYZM_CONSUMER") โมฆะสาธารณะ HandleUserInfo (ผู้ใช้ USERMODEL) {logger.info ("ข้อความที่ 1 ได้รับ: {}", New GSON (). TOJSON (ผู้ใช้)); } @Override @consumerannotation ("KDYZM_CONSUMER1") โมฆะสาธารณะ HandleUserInfo1 (ผู้ใช้ USERMODEL) {logger.info ("ข้อความที่ 2 ได้รับ: {}", ใหม่ GSON (). TOJSON (ผู้ใช้)); -นี่คือสองคำอธิบายประกอบใหม่ @Queueresource และ @consumerannotation คำอธิบายประกอบทั้งสองนี้จะถูกกล่าวถึงในอนาคต บางคนอาจถามฉันว่าทำไมฉันควรใช้ชื่อผู้บริโภคแทนชื่อผู้บริโภคเพราะชื่อผู้บริโภคขัดแย้งกับชื่อใน SDK ที่ Aliyun จัดหาให้ - - -
ที่นี่ผู้บริโภคให้อินเทอร์เฟซ API แก่ผู้ผลิตเพื่ออำนวยความสะดวกให้ผู้ผลิตส่งข้อความและผู้บริโภคใช้อินเทอร์เฟซเพื่อใช้ข้อความที่ส่งโดยผู้ผลิต วิธีการใช้อินเทอร์เฟซ API คือการใช้การตรวจสอบซึ่งเป็นตรรกะที่ค่อนข้างสำคัญ
2.Queue-Core ใช้ตรรกะหลักของการฟังคิวข้อความ
ขั้นตอนที่ 1: ใช้วิธีการฟังของคอนเทนเนอร์สปริงเพื่อรับถั่วทั้งหมดด้วยคำอธิบายประกอบ Queueresource
ขั้นตอนที่ 2: แจกจ่ายถั่วแปรรูป
จะจัดการกับถั่วเหล่านี้ได้อย่างไร? ถั่วแต่ละตัวเป็นวัตถุจริงๆ ด้วยวัตถุเช่นวัตถุ UserQueUerEsourceImpl ในตัวอย่างข้างต้นเราสามารถรับออบเจ็กต์อินเทอร์เฟซไบต์ที่ใช้งานโดยวัตถุจากนั้นรับหมายเหตุประกอบในอินเตอร์เฟส userqueuererece และคำอธิบายประกอบในวิธีการและวิธีการ แน่นอนว่าคำอธิบายประกอบเกี่ยวกับวิธีการใช้งานของผู้ใช้ UserQueueresourceImpl สามารถรับได้ ที่นี่ฉันจะใช้ consumerId เป็นคีย์และข้อมูลที่เกี่ยวข้องที่เหลือจะถูกห่อหุ้มเป็นค่าและแคชลงในวัตถุแผนที่ รหัสหลักมีดังนี้:
คลาส <s?> clazz = resourceimpl.getClass (); คลาส <?> clazzif = clazz.getInterfaces () [0]; วิธี [] วิธีการ = clazz.getMethods (); String topicName = mqutils.getTopicName (clazzif); สำหรับ (วิธีการ M: วิธีการ) {Consumerannotation Consumeranno = M.GetAnnotation (Consumerannotation.class); if (null == Consumeranno) {// logger.error ("method = {} ต้องการคำอธิบายประกอบของผู้บริโภค", m.getName ()); ดำเนินการต่อ; } string consumerId = consumeranno.value (); if (stringUtils.isEmpty (consuerId)) {logger.error ("method = {} consumerId ไม่สามารถเป็นโมฆะ", m.getName ()); ดำเนินการต่อ; } คลาส <?> [] parameterTypes = M.GetParameterTypes (); วิธีการ ResourceIfMethod = null; ลอง {ResourceIfMethod = clazzif.getMethod (m.getName (), parameterTypes); } catch (nosuchmethodexception | SecurityException e) {logger.error ("ไม่พบวิธี = {} ที่ super interface = {}.", m.getName (), clazzif.getCanonicalName (), e); ดำเนินการต่อ; } String tagname = mqutils.getTagname (ResourceifMethod); ผู้บริโภค Map.put (ConsuerId, MethodInfo ใหม่ (TopicName, Tagname, M)); -ขั้นตอนที่ 3: การดำเนินการบริโภคผ่านการสะท้อนกลับ
ขั้นแรกให้กำหนดเวลาของการดำเนินการสะท้อนการดำเนินการนั่นคือฟังข้อความใหม่
ประการที่สองวิธีการดำเนินการสะท้อนกลับ? ฉันจะไม่เข้าไปดูรายละเอียด รองเท้าเด็กที่มีรากฐานที่เกี่ยวข้องกับการสะท้อนรู้วิธีทำ รหัสหลักมีดังนี้:
MQConnection ConnectionInfo = queUecorespringutils.getBean (mqconnection.class); String topicPrefix = connectionInfo.getPrefix ()+"_"; String ConsumerIdPrefix = คำนำหน้า+connectionInfo.getPrefix ()+"_"; สำหรับ (String ConsumerId: ConsumerSmap.keyset ()) {methodInfo methodInfo = consumerSmap.get (ConsumerId); คุณสมบัติการเชื่อมต่อ properties = ConvertToproperties (ConnectionInfo); // ID ผู้บริโภคที่คุณสร้างในคอนโซล ConnectionProperties.put (PropertyKeyConst.ConsumerId, ConsumerIdPrefix+ConsumerId); ผู้บริโภคผู้บริโภค = onsfactory.createConsumer (ConnectionProperties); consumer.subscribe (TopicPrefix+MethodInfo.getTopicName (), MethodInfo.getTagname (), New MessageListener () {// สมัครสมาชิกการใช้การกระทำสาธารณะหลายแท็ก topic = {}, tag = {}, consumerId = {}, message = {} ", topicPrefix+methodInfo.getTopicName (), methodInfo.getTagname (), consumerIdPrefix arg = jacksonserializer.deserialize (Message Body, ParameterType); consumer.start (); logger.info ("ผู้บริโภค = {} เริ่มต้นขึ้นแล้ว", ConsumerIdPrefix+ConsumerId); -5. ดูลิงค์ Git ด้านล่างสำหรับรหัสที่สมบูรณ์
https://github.com/kdyzm/queue-core.git
ข้างต้นเป็นเนื้อหาทั้งหมดของบทความนี้ ฉันหวังว่ามันจะเป็นประโยชน์ต่อการเรียนรู้ของทุกคนและฉันหวังว่าทุกคนจะสนับสนุน wulin.com มากขึ้น