ภาพรวม
แนวคิดพื้นฐาน
นายหน้า
เอนทิตีเซิร์ฟเวอร์คิวข้อความที่ใช้ในการประมวลผลข้อมูล
Vhost
โฮสต์ข้อความเสมือนจริงที่สร้างโดยเซิร์ฟเวอร์ RabbitMQ มีกลไกการอนุญาตของตนเอง สามารถเปิด Vhosts หลายตัวในโบรกเกอร์เพื่อแยกการอนุญาตสำหรับผู้ใช้ที่แตกต่างกันและ Vhosts ก็แยกได้อย่างสมบูรณ์
ผู้ผลิต
สร้างข้อมูลสำหรับการสื่อสารข้อความ
ช่อง
ช่องข้อความสามารถสร้างช่องหลายช่องใน AMQP แต่ละช่องแสดงงานเซสชัน
แลกเปลี่ยน
โดยตรง
ส่งต่อข้อความไปยังคิวที่ระบุโดยการกำหนดเส้นทางคีย์
คนบ้า
คนบ้า
การส่งต่อข้อความไปยังคิวที่ถูกผูกไว้ทั้งหมดนั้นคล้ายกับวิธีการออกอากาศ
หัวข้อ
หัวข้อ
ส่งต่อข้อความตามกฎ กฎนี้ส่วนใหญ่เป็นการจับคู่รูปแบบและมันก็มีความยืดหยุ่นมากขึ้น
คิว
คิว
ผูกพัน
มันแสดงถึงความสัมพันธ์ระหว่างสวิตช์และคิว เมื่อมีผลผูกพันมันมาพร้อมกับพารามิเตอร์ที่มีผลผูกพันคีย์เพื่อให้ตรงกับคีย์การกำหนดเส้นทาง
ผู้บริโภค
ฟังคิวข้อความเพื่ออ่านข้อมูลข้อความ
สามโหมดแลกเปลี่ยน (Fanout, Direct, Topic) การใช้งานภายใต้ Springboot
อ้างอิง Spring-Boot-Starter-AMQP ใน pom.xml
<การพึ่งพา> <roupId> org.springframework.boot </groupId> <ratifactid> Spring-Boot-Starter-AMQP </artifactId>
เพิ่มการกำหนดค่า RabbitMQ
ฤดูใบไม้ผลิ: RabbitMQ: โฮสต์: พอร์ต LocalHost: 5672 ชื่อผู้ใช้: ผู้เยี่ยมชมรหัสผ่าน: แขก
โดยตรง
ในโหมดตรงจะต้องมีเพียงคิวเพื่อกำหนดโดยทั่วไป ใช้สวิตช์ในตัว (DefaultExchange) โดยไม่ต้องเชื่อมโยงสวิตช์
@ConfigurationPublic คลาส rabbitp2pconfigure {public string สุดท้ายสตริงสุดท้าย queue_name = "p2p-queue"; @Bean สาธารณะคิวคิว () {ส่งคืนคิวใหม่ (queue_name, true); - @runwith (springrunner.class) @springboottest (classes = bootcoretestapplication.class)@slf4jpublic คลาส rabbittest {@autowired ส่วนตัว amqptemplate amqptemplate; / *** ส่ง*/ @Test สาธารณะโมฆะ sendlazy () พ่น InterruptedException {เมืองเมือง = เมืองใหม่ (2345566666L, "direct_name", "direct_code"); amqptemplate.convertandsend (rabbitlazyconfigure.queue_name, เมือง); } / *** รับโมฆะสาธารณะ* / @Test Public Void รับ () พ่น InterruptedException {Object obj = amqptemplate.receiveAndConvert (rabbitlazyconfigure.queue_name); assert.notnull (obj, ""); log.debug (obj.toString ()); -สถานการณ์ที่ใช้งานได้: จุดต่อจุด
คนบ้า
โหมด Fanout ต้องมีการผูกคิวหลายคิวกับสวิตช์เดียวกัน
@ConfigurationPublic คลาส rabbitfanoutconfigure {สตริงคงสุดท้ายสาธารณะ exchange_name = "Fanout-Exchange"; สตริงสุดท้ายคงที่ FANOUT_A = "FANOUT.A"; สตริงสุดท้ายคงที่ FANOUT_B = "FANOUT.B"; สตริงสุดท้ายคงที่ FANOUT_C = "FANOUT.C"; @Bean สาธารณะ amessage () {ส่งคืนคิวใหม่ (FANOUT_A); } @Bean สาธารณะคิว bmessage () {ส่งคืนคิวใหม่ (fanout_b); } @Bean สาธารณะคิว cMessage () {ส่งคืนคิวใหม่ (FANOUT_C); } @Bean Public FanoutExchange FanoutExchange () {ส่งคืน FanOutExchange ใหม่ (Exchange_Name); } @Bean สาธารณะ BindingExchangea (คิว amessage, fanoutExchange fanoutExchange) {ส่งคืน bindingBuilder.bind (amessage) .to (fanoutexchange); } @Bean สาธารณะ BindingExChangeB (คิว bmessage, fanoutExchange fanoutExchange) {ส่งคืน bindingBuilder.bind (bmessage) .to (fanoutExchange); } @Bean สาธารณะ BindingExchAngec (คิว cmessage, fanoutexchange fanoutexchange) {return bindingbuilder.bind (cmessage) .to (fanoutexchange); -ผู้ส่ง
@slf4jpublic ผู้ส่งคลาส {@autowired ส่วนตัว amqptemplate rabbittemplate; โมฆะสาธารณะ sendfanout (ข้อความวัตถุ) {log.debug ("เริ่มส่งข้อความ Fanout <" + ข้อความ + ">"); Rabbittemplate.convertandsend (rabbitfanoutconfigure.exchange_name, "", ข้อความ); -เราสามารถใช้ @RabBitListener เพื่อฟังหลายคิวเพื่อบริโภค
@slf4j @rabbitlistener (queues = {rabbitfanoutconfigure.fanout_a, rabbitfanoutconfigure.fanout_b, rabbitfanoutconfigure.fanout_c}) - สถานการณ์ที่เกี่ยวข้อง
-เกม Multi-user Online (MMO) ขนาดใหญ่สามารถใช้เพื่อจัดการกับกิจกรรมระดับโลกเช่นการอัปเดตการจัดอันดับ
- เว็บไซต์ข่าวกีฬาสามารถใช้เพื่อแจกจ่ายการอัปเดตคะแนนให้กับไคลเอนต์มือถือในเวลาใกล้เคียงจริง
- ระบบการกระจายใช้เพื่อออกอากาศสถานะต่างๆและการอัปเดตการกำหนดค่า
- ในระหว่างการแชทเป็นกลุ่มจะใช้เพื่อแจกจ่ายข้อความไปยังผู้ใช้ที่เข้าร่วมในการแชทเป็นกลุ่ม
หัวข้อ
รูปแบบนี้ค่อนข้างซับซ้อน พูดง่ายๆคือแต่ละคิวมีหัวข้อที่น่ากังวล ข้อความทั้งหมดมี "ชื่อ" Exchange จะส่งต่อข้อความไปยังคิวที่มีหัวข้อที่เกี่ยวข้องกับการจับคู่เส้นทางลวดลาย
เมื่อมีผลผูกพันให้จัดทำหัวข้อที่คิวเป็นห่วงเช่น "หัวข้อ# ("# "หมายถึง 0 หรือหลายคำหลักและ"*"หมายถึงคำหลัก)
@ConfigurationPublic คลาส rabbittopicConfigure {สตริงสุดท้ายคงที่ exchange_name = "หัวข้อการแลกเปลี่ยน"; public Static Final String topic = "topic"; สตริงสุดท้ายคงที่สาธารณะ topic_a = "topic.a"; สตริงสุดท้ายคงที่สาธารณะ topic_b = "topic.b"; @bean สาธารณะคิวคิว queuetopic () {ส่งคืนคิวใหม่ (rabbittopicconfigure.topic); } @Bean สาธารณะคิวคิว queuetopica () {ส่งคืนคิวใหม่ (rabbittopicConfigure.topic_a); } @Bean สาธารณะคิวคิว queuetopicb () {ส่งคืนคิวใหม่ (rabbittopicConfigure.topic_b); } @Bean Public Publicxchange Exchange () {TopicexChange TopicexChange = New TopicexChange (Exchange_Name); topicexchange.setDelayed (จริง); ส่งคืน topicexchange ใหม่ (Exchange_name); } @Bean การเชื่อมโยงสาธารณะ BindingExchAngetOpic (คิวคิว, การแลกเปลี่ยน topicexchange) {ส่งคืน bindingbuilder.bind (queuetopic) .to (แลกเปลี่ยน). with (rabbittopicconfigure.topic); } @Bean การเชื่อมโยงสาธารณะ BindingExchAngetopics (คิว queue queuetopica, topicexchange Exchange) {ส่งคืน bindingbuilder.bind (queuetopica) .to (แลกเปลี่ยน). with ("หัวข้อ.#"); -ในเวลาเดียวกันฟังสามคิว
@slf4j @rabbitlistener (queues = {rabbittopicconfigure.topic, rabbittopicconfigure.topic_a, rabbittopicconfigure.topic_b}) ตัวรับสัญญาณสาธารณะ {@rabbithandler -ผ่านการทดสอบเราสามารถหาได้
@runwith (springrunner.class) @springboottest (classes = bootcoretestapplication.class) คลาสสาธารณะ rabbittest {@autowired ส่วนตัว amqptemplate rabbittemplate; @Test โมฆะสาธารณะ SendALL () {RABBITTEMPLATE.CONVERTANDSEND (RABBITTOPICCONFIGURE.EXCHANGE_NAME, "TOPIC.TEST", "ส่งทั้งหมด"); } @Test โมฆะสาธารณะ sendTopic () {rabbittemplate.convertandsend (rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic, "ส่งหัวข้อ"); } @Test โมฆะสาธารณะ sendtopica () {rabbittemplate.convertandsend (rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic_a, "ส่งหัวข้อ"); - สถานการณ์ที่เกี่ยวข้อง
- แจกจ่ายข้อมูลเกี่ยวกับตำแหน่งทางภูมิศาสตร์ที่เฉพาะเจาะจงเช่นจุดขาย
- งานหลังเวทีเสร็จสมบูรณ์โดยคนงานหลายคนคนงานแต่ละคนรับผิดชอบในการจัดการงานเฉพาะบางอย่าง
- การอัปเดตราคาหุ้น (และการอัปเดตข้อมูลทางการเงินประเภทอื่น ๆ )
- การอัปเดตข่าวที่เกี่ยวข้องกับหมวดหมู่หรือแท็ก (ตัวอย่างเช่นสำหรับกีฬาหรือทีมเฉพาะ)
- การประสานงานของบริการประเภทต่าง ๆ ในคลาวด์
- แพคเกจซอฟต์แวร์สถาปัตยกรรม/ระบบแบบกระจายซึ่งผู้สร้างแต่ละคนสามารถจัดการสถาปัตยกรรมหรือระบบเฉพาะเพียงหนึ่งเดียวเท่านั้น
เลื่อนคิว
การบริโภคล่าช้า:
ลองล่าช้า:
ตั้งค่าคุณสมบัติ Switch Delay เป็น TRUE
@ConfigurationPublic คลาส rabbitlazyconfigure {สาธารณะคงที่สตริงสุดท้าย queue_name = "lazy-queue-t"; สาธารณะคงที่สตริงสุดท้าย exchange_name = "lazy-exchange-t"; @Bean สาธารณะคิวคิว () {ส่งคืนคิวใหม่ (queue_name, true); } @Bean Public DirectExChange DefaultExChange () {DirectExChange DirectExChange = ใหม่ DirectExChange (Exchange_Name, True, False); DirectExchange.setDelayed (จริง); ส่งคืน DirectExchange; } @Bean การเชื่อมโยงสาธารณะ () {return bindingBuilder.bind (queue ()). ถึง (defaultExchange ()) ด้วย (queue_name); -ตั้งค่าเวลาหน่วงเมื่อส่ง
@slf4jpublic ผู้ส่งคลาส {@autowired ส่วนตัว amqptemplate rabbittemplate; โมฆะสาธารณะ Sendlazy (Object msg) {log.debug ("เริ่มส่งข้อความขี้เกียจ <" + msg + ">"); Rabbittemplate.ConvertandSend (rabbitlazyconfigure.exchange_name, rabbitlazyconfigure.queue_name, msg, ข้อความ -> {message.getMessageProperties () Setheader ("X -Delay", 10000); -เสร็จ
โปรดตรวจสอบเอกสารอย่างเป็นทางการโดยตรงสำหรับกรณีการใช้งานต่างๆ
ข้างต้นเป็นเนื้อหาทั้งหมดของบทความนี้ ฉันหวังว่ามันจะเป็นประโยชน์ต่อการเรียนรู้ของทุกคนและฉันหวังว่าทุกคนจะสนับสนุน wulin.com มากขึ้น