เราเขียนเกี่ยวกับการส่งและรับข้อความผ่านคิวที่มีชื่อ หากคุณยังไม่รู้โปรดคลิก: เริ่มต้นใช้งานกับ RabbitMQ Java ในบทความนี้เราจะสร้างคิวงานเพื่อแจกจ่ายงานที่ใช้เวลานานในหมู่ผู้บริโภค
ภารกิจหลักของคิวงานคือการหลีกเลี่ยงการดำเนินงานที่ต้องใช้ทรัพยากรอย่างเข้มข้นทันทีจากนั้นต้องรอให้พวกเขาเสร็จสมบูรณ์ แต่เราดำเนินการจัดตารางงาน: เราห่อหุ้มงานเป็นข้อความและส่งไปยังคิว งานทำงานในพื้นหลังและลบงานอย่างต่อเนื่องจากคิวแล้วดำเนินการ เมื่อคุณเรียกใช้กระบวนการทำงานหลายอย่างงานในคิวงานจะถูกแชร์โดยกระบวนการของผู้ปฏิบัติงาน
แนวคิดดังกล่าวมีประโยชน์อย่างมากในเว็บแอปพลิเคชันเมื่อต้องดำเนินการที่ซับซ้อนระหว่างคำขอ HTTP สั้น ๆ
1. เตรียม
เราใช้ Thread.sleep เพื่อจำลองงานที่ใช้เวลานาน เราเพิ่มจำนวนคะแนนที่แน่นอนในตอนท้ายของข้อความที่ส่งไปยังคิวแต่ละจุดหมายความว่าใช้เวลา 1 วินาทีในเธรดคนงานเช่นสวัสดี ... จะต้องรอ 3 วินาที
ผู้ส่ง:
newtask.java
นำเข้า java.io.ioException; นำเข้า com.rabbitmq.client.channel; นำเข้า com.rabbitmq.client.connection; นำเข้า com.rabbitmq.client.connectionfactory; คลาสสาธารณะ newtask {// ชื่อส่วนตัว Channel ConnectionFactory Factory = new ConnectionFactory (); Factory.Sethost ("localhost"); การเชื่อมต่อการเชื่อมต่อ = Factory.newConnection (); ช่องสัญญาณ = การเชื่อมต่อ. createchannel (); // ประกาศช่องทางคิว i ++) {สตริงจุด = "" สำหรับ (int j = 0; j <= i; j ++) {dots+= ".";} ข้อความสตริง = "helloWorld"+dots+dots.length (); channel.basicpublish ("" "'");} // ปิดช่องสัญญาณและช่องทางทรัพยากร close (); connection.close ();}}ผู้รับ:
work.java
นำเข้า com.rabbitmq.client.channel; นำเข้า com.rabbitmq.client.connection; นำเข้า com.rabbitmq.client.connectionfactory; นำเข้า com.rabbitmq.client.queueingconsumer; java.io.ioException, java.lang.InterruptedException {// แยกแยะระหว่างกระบวนการคนงานที่แตกต่างกัน int hashCode = work.class.hashCode (); // สร้างการเชื่อมต่อและช่องสัญญาณเชื่อมต่อโรงงาน = การเชื่อมต่อใหม่ คิว channel.queuedeclare (queue_name, false, false, false, null); system.out.println (hashcode + "[*] กำลังรอข้อความเพื่อออกจากการกด Ctrl + C"); queueingConsumer censumer = queueingConsumer ในขณะที่ (จริง) {queueingConsumer.Delivery Delivery = Cumener.nextDelivery (); ข้อความสตริง = สตริงใหม่ (deliver.getBody ()); system.out.println (hashCode + "[x] ได้รับ '" + ข้อความ + "'); dowork (ข้อความ); @Param task * @throws interruptedException */private static void dowork (งานสตริง) พ่น InterruptedException {สำหรับ (char ch: task.tochararray ()) {ถ้า (ch == '.') thread.sleep (1,000);}}}}}} การส่งต่อรอบโรบิน
ข้อดีของการใช้คิวงานคือพวกเขาสามารถทำงานแบบขนานได้อย่างง่ายดาย หากเรามีงานค้างงานจำนวนมากเราสามารถแก้ปัญหาได้โดยการเพิ่มคนงานมากขึ้นทำให้ระบบปรับขนาดได้มากขึ้น
ต่อไปเราจะเรียกใช้อินสแตนซ์คนงาน 3 คน (work.java) ก่อนจากนั้นเรียกใช้ newtask.java อินสแตนซ์ของคนงานทั้งสามจะได้รับข้อมูล แต่จะจัดสรรได้อย่างไร? มาดูผลลัพธ์ผลลัพธ์:
[x] ส่ง 'helloworld.1' [x] ส่ง 'helloWorld..2' [x] ส่ง 'helloWorld ... 3' [x] ส่ง 'helloWorld ...... 4' [x] ส่ง 'Helloworld ...... 5' 'HelloWorld .......... 10' คนงาน 1: 605645 [*] กำลังรอข้อความ เพื่อออกจากกด Ctrl+C605645 [x] ได้รับ 'HelloWorld.1'605645 [x] Done605645 [x] ได้รับ' HelloWorld .... 4'605645 [X] DOD605645 [X] ได้รับ 'Helloworld ......... 7'605645 'HelloWorld .......... 10'605645 [x] คนงานทำ 2: 18019860 [*] กำลังรอข้อความ หากต้องการออกจากกด Ctrl+C18019860 [x] ได้รับ 'HelloWorld..2'18019860 [X] DONE18019860 [x] ได้รับ' Helloworld ..... 5'18019860 [x] เสร็จแล้ว 18019860 [x] ได้รับ [*] รอข้อความ หากต้องการออกจากกด Ctrl+C18019860 [x] ได้รับ 'HelloWorld ... 3'18019860 [X] DONE18019860 [X] ได้รับ' Helloworld ...... 6'18019860 [x] เสร็จแล้ว 18019860
อย่างที่คุณเห็นโดยค่าเริ่มต้น RabbitMQ จะส่งข้อมูลไปยังผู้บริโภครายต่อไปโดยไม่คำนึงถึงระยะเวลาของแต่ละงาน ฯลฯ และเป็นการจัดสรรครั้งเดียวไม่ใช่การจัดสรรแบบหนึ่งต่อหนึ่ง โดยเฉลี่ยแล้วผู้บริโภคแต่ละรายจะได้รับข้อมูลจำนวนเท่ากัน วิธีการกระจายข้อความนี้เรียกว่า Round-Robin
2. Messageacknowledgments
ใช้เวลาหลายวินาทีในการทำงาน คุณอาจกังวลเกี่ยวกับการหยุดชะงักเมื่อคนงานทำงาน ในรหัสของเราด้านบนเมื่อ RabbitMQ ส่งข้อความไปยังผู้บริโภคมันจะลบข้อมูลนี้ออกจากหน่วยความจำทันที ในกรณีนี้หากคนงานคนหนึ่งที่ปฏิบัติงานถูกฆ่าตายเราจะสูญเสียข้อมูลที่กำลังประมวลผล เราจะสูญเสียข้อความที่ส่งต่อไปยังคนงานนี้และยังไม่ได้ดำเนินการ
ในตัวอย่างด้านบนเราเริ่มงานสองงานก่อนจากนั้นเรียกใช้รหัสที่ส่งงาน (newtask.java) จากนั้นปิดงานที่สองทันทีและผลลัพธ์คือ:
คนงาน 2: 31054905 [*] Waitingformessages.toExitPressCtrl+C 31054905 [x] ได้รับ 'Helloworld..2' 31054905 [X] เสร็จสิ้น 31054905 [x] ได้รับ 18019860 [x] ได้รับ 'helloworld.1' 18019860 [x] เสร็จสิ้น 18019860 [x] ได้รับ 'helloworld ... 3' 18019860 [x] เสร็จแล้ว 18019860 [x] ได้รับ '......... 5' 18019860 18019860 [x] เสร็จแล้ว 18019860 [x] ได้รับ 'Helloworld ......... 9' 18019860 [x] เสร็จสิ้น
จะเห็นได้ว่าคนงานที่สองสูญเสียงานอย่างน้อย 6, 8 และ 10 และงาน 4 ยังไม่เสร็จสมบูรณ์
อย่างไรก็ตามเราไม่ต้องการสูญเสียงานใด ๆ (ข้อมูล) เมื่อคนงาน (ผู้รับ) ถูกฆ่าตายเราต้องการส่งงานไปยังคนงานคนอื่น
เพื่อให้แน่ใจว่าข้อความจะไม่สูญหาย RabbitMQ รองรับการตอบรับข้อความ ผู้บริโภคส่งการตอบกลับไปยัง RabbitMQ โดยบอกว่าได้รับและประมวลผลข้อมูลแล้ว RabbitMQ สามารถลบข้อมูลได้อย่างอิสระ
หากผู้บริโภคถูกฆ่าโดยไม่ส่งคำตอบ RabbitMQ จะสมมติว่าข้อมูลยังไม่ได้รับการประมวลผลอย่างสมบูรณ์และจะถูกนำไปยังผู้บริโภครายอื่น ด้วยวิธีนี้คุณสามารถยืนยันได้ว่าข้อมูลจะไม่สูญหายแม้ว่าผู้บริโภคจะถูกฆ่าตายเป็นครั้งคราว
กลไกนี้ไม่ได้หมายความว่าการหมดเวลาไม่ใช่กรณี RabbitMQ จะโพสต์ข้อมูลนี้อีกครั้งเมื่อการเชื่อมต่อผู้บริโภคถูกตัดการเชื่อมต่อ ได้รับอนุญาตหากผู้บริโภคใช้เวลานานในการประมวลผลข้อความ
การตอบกลับข้อความเป็นค่าเริ่มต้น ในรหัสข้างต้นเราปิดกลไกนี้โดยการตั้งค่า autoask = true ตามที่แสดง ลองแก้ไขรหัส (work.java):
บูลีน ACK = FALSE; // เปิดช่องกลไกการตอบสนอง BasicConsume (queue_name, ACK, ผู้บริโภค); // นอกจากนี้คุณต้องส่งการตอบกลับด้วยตนเองหลังจากการประมวลผลแต่ละครั้งเสร็จสิ้นข้อความ channel.basicack (deliver.getenvelope (). getDeliveryTag (), false);
งานดัดแปลงอย่างสมบูรณ์ Java
นำเข้า com.rabbitmq.client.channel; นำเข้า com.rabbitmq.client.connection; นำเข้า com.rabbitmq.client.connectionfactory; นำเข้า com.rabbitmq.client.queueingconsumer; java.io.ioException, java.lang.InterruptedException {// แยกแยะระหว่างกระบวนการคนงานที่แตกต่างกัน int hashCode = work.class.hashCode (); // สร้างการเชื่อมต่อและช่องสัญญาณเชื่อมต่อโรงงาน = การเชื่อมต่อใหม่ queue channel.queuedeclare (queue_name, false, false, false, null); system.out.println (hashcode + "[*] กำลังรอข้อความเพื่อออกจากการกด Ctrl + C"); คิว ack -ack -acke channel.basicconsume (queue_name, ack, ผู้บริโภค); ในขณะที่ (จริง) {queueingConsumer.Delivery Delivery = compener.nextDelivery (); ข้อความสตริง = สตริงใหม่ (deliver.getBody ()); system.out.println (hashcode + "[x] ได้รับ '" ข้อความ + ""); dowork (ข้อความ); channel.basicack (deliver.getenvelope (). getdeliverytag (), false);}}}ทดสอบ:
เราเปลี่ยนจำนวนข้อความเป็น 5 จากนั้นเปิดผู้บริโภคสองคน (work.java) จากนั้นส่งงาน (newtask.java) ปิดผู้บริโภคหนึ่งรายทันทีและสังเกตผลลัพธ์:
[x] Sent'helloworld..2 '[x] Sent'helloworld ... 3' [x] Sent'helloworld ... 4 '[x] Sent'helloworld ...... 5' Worker2 18019860 [*] Waitingformessages.toexitpressctrl+C 18019860 [x] ได้รับ 18019860 [x] ได้รับ 'Helloworld .... 4' คนงาน 1 31054905 [*] Waitingformessages.ToExitPressCtrl+C 31054905 [X] ได้รับ 'Helloworld.1' 31054905 [x] ทำ 31054905 [x] 31054905 [x] ได้รับ 'Helloworld .... 5' 31054905 [x] เสร็จแล้ว 31054905 [x] ได้รับ 'Helloworld .... 4' 31054905 [x] เสร็จสิ้น
คุณจะเห็นได้ว่าภารกิจ 4 ที่ไม่สมบูรณ์โดย Worker 2 ถูกส่งต่อไปยัง Worker 1 อีกครั้งเพื่อให้เสร็จสิ้น
3. ความทนทานที่ส่งข้อความ
เราได้เรียนรู้ว่าแม้ว่าผู้บริโภคจะถูกฆ่าตายข้อความจะไม่หายไป แต่ถ้าบริการ RabbitMQ หยุดในเวลานี้ข้อความของเราจะยังคงหายไป
เมื่อ RabbitMQ ออกหรือออกอย่างผิดปกติคิวและข้อมูลทั้งหมดจะหายไปเว้นแต่คุณจะบอกว่าจะไม่สูญเสียมัน เราต้องทำสองสิ่งเพื่อให้แน่ใจว่าข้อมูลจะไม่สูญหาย: เราจำเป็นต้องตั้งค่าสถานะถาวรสำหรับคิวและข้อความทั้งหมด
ก่อนอื่นเราต้องยืนยันว่า RabbitMQ จะไม่สูญเสียคิวของเรา ในการทำเช่นนี้เราต้องประกาศว่ามันคงอยู่
booleverable = true;
channel.queuedeclare ("task_queue", ทนทาน, เท็จ, เท็จ, null);
หมายเหตุ: RabbitMQ ไม่อนุญาตให้มีการกำหนดคิวใหม่ด้วยพารามิเตอร์ที่แตกต่างกันดังนั้นเราจึงไม่สามารถแก้ไขแอตทริบิวต์ของคิวที่มีอยู่แล้ว
ประการที่สองเราจำเป็นต้องระบุข้อมูลของเราอย่างถาวร ตั้งค่า MessageProperties (onplementBasicProperties) เป็นค่า presentent_text_plain
channel.basicpublish ("", "task_queue", messageproperties.persistent_text_plain, message.getBytes ());
ตอนนี้คุณสามารถเรียกใช้โปรแกรมที่ส่งข้อความจากนั้นปิดบริการรีสตาร์ทบริการและเรียกใช้โปรแกรมผู้บริโภคเพื่อทำการทดลอง
4. Fairdispatch
บางทีเราอาจจะพบว่ากลไกการส่งต่อข้อความปัจจุบัน (Round-Robin) ไม่ใช่สิ่งที่เราต้องการ ตัวอย่างเช่นในกรณีนี้สำหรับผู้บริโภคสองคนมีชุดของงานงานแปลก ๆ โดยเฉพาะอย่างยิ่งใช้เวลานานในขณะที่แม้งานจะง่ายซึ่งทำให้ผู้บริโภคคนหนึ่งยุ่งในขณะที่ผู้บริโภครายอื่นทำงานให้เสร็จสมบูรณ์และรอหลังจากเสร็จสิ้น
เหตุผลนี้คือ RabbitMQ ส่งต่อข้อความเฉพาะเมื่อข้อความมาถึงในคิว ไม่สนใจว่าผู้บริโภคจะไม่ได้ส่งคำตอบไปยัง RabbitMQ จำนวนเท่าใด เพียงแค่ส่งต่อตัวเลขคี่ทั้งหมดไปยังผู้บริโภครายหนึ่งและแม้แต่ตัวเลขไปยังผู้บริโภครายอื่น
เพื่อแก้ปัญหานี้เราสามารถใช้วิธี BasicQOS ผ่านพารามิเตอร์เป็น prefetchCount = 1 สิ่งนี้บอก RabbitMQ ไม่ให้ส่งข้อความมากกว่าหนึ่งข้อความให้กับผู้บริโภคในเวลาเดียวกัน กล่าวอีกนัยหนึ่งข้อความถัดไปจะถูกส่งเฉพาะเมื่อผู้บริโภคไม่ได้ใช้งาน
int prefetchCount = 1; channel.basicqos (prefetchCount);
หมายเหตุ: หากคนงานทุกคนไม่ว่างคิวของคุณอาจถูกเติมเต็ม คุณอาจสังเกตการใช้คิวจากนั้นเพิ่มพนักงานหรือใช้กลยุทธ์อื่น ๆ
ทดสอบ: เปลี่ยนรหัสเพื่อส่งข้อความเปลี่ยนจำนวนจุดสิ้นสุดของคะแนนเป็น 6-2 จากนั้นเริ่มคนงานสองคนก่อนจากนั้นส่งข้อความ:
[x] ส่ง 'helloWorld ...... 6' [x] ส่ง 'HelloWorld ..... 5' [x] ส่ง 'HelloWorld .... 4' [x] ส่ง 'HelloWorld ... 3' [x] ส่ง 'HelloWorld..2' คนงาน 1: 18019860 [*] รอข้อความ เพื่อออกจากกด Ctrl+C18019860 [x] ได้รับ 'HelloWorld ...... 6'18019860 [X] DONE18019860 [X] ได้รับ' HelloWorld ... 3'18019860 [x] ทำคนงาน 2: 31054905 [*] รอข้อความ เพื่อออกจากกด Ctrl+C31054905 [x] ได้รับ 'HelloWorld ...... 5'31054905 [x] Done31054905 [x] ได้รับ' HelloWorld ...... 4'31054905 [x] เสร็จสิ้น
จะเห็นได้ว่าข้อความไม่ได้ถูกส่งต่อตามกลไกรอบก่อนหน้านี้ในเวลานี้ แต่ถูกส่งต่อเมื่อผู้บริโภคไม่ว่าง นอกจากนี้ภายใต้โมเดลนี้ผู้บริโภคจะได้รับการสนับสนุนเพื่อเพิ่มแบบไดนามิกเนื่องจากข้อความไม่ได้ถูกส่งออกไปและการเพิ่มขึ้นของไดนามิกจะเพิ่มขึ้นทันที กลไกการส่งต่อเริ่มต้นจะทำให้เกิดขึ้นแม้ว่าผู้บริโภคจะถูกเพิ่มไปตามแบบไดนามิกข้อความได้รับการจัดสรรและไม่สามารถเพิ่มได้ทันทีแม้ว่าจะมีงานที่ยังไม่เสร็จ
5. รหัสสมบูรณ์
newtask.java
นำเข้า java.io.ioException; นำเข้า com.rabbitmq.client.channel; นำเข้า com.rabbitmq.client.connection; นำเข้า com.rabbitmq.client.connectionfactory; นำเข้า com.rabbitmq.client.messageproperties; โมฆะคงที่หลัก (String [] args) พ่น IOException {// สร้างการเชื่อมต่อและช่องสัญญาณเชื่อมต่อกับช่องสัญญาณ = new ConnectionFactory (); Factory.Sethost ("localhost"); การเชื่อมต่อการเชื่อมต่อ = Factory.newConnection () ช่องสัญญาณ = การเชื่อมต่อ channel.queuedeclare (queue_name, ทนทาน, เท็จ, เท็จ, null); // ส่ง 10 ข้อความต่อท้าย 1-10 คะแนนหลังจากข้อความในทางกลับกัน (int i = 5; i> 0; i--) {สตริงจุด = "" สำหรับ (int j = 0; j <= i; j ++) {dots += " MessageProperties 2. SET MESSANTE CHANNEL การคงอยู่ BASICPUBLISH ("", queue_name, MessageProperties.persistent_text_plain, message.getBytes ()); system.out.println ("[x] ส่ง '" + ข้อความ + "");work.java
นำเข้า com.rabbitmq.client.channel; นำเข้า com.rabbitmq.client.connection; นำเข้า com.rabbitmq.client.connectionfactory; นำเข้า com.rabbitmq.client.queueingconsumer; พ่น java.io.ioexception, java.lang.InterruptedException {// แยกความแตกต่างของผลลัพธ์ของกระบวนการคนงานที่แตกต่างกัน int hashCode = work.class.hashCode (); // สร้างการเชื่อมต่อ Connection.createchannel (); // ประกาศคิวบูลีนทนทาน = true; channel.queuedeclare (queue_name, ทนทาน, เท็จ, เท็จ, null); system.out.println (hashcode + "[*] การรอข้อความ 1; channel.basicqos (prefetchCount); คิวผู้บริโภคคิวผู้บริโภค = คิวใหม่ (ช่องทาง); // ระบุการบริโภคคิวบูลีน ACK = FALSE; // เปิดกลไกการตอบสนอง ในขณะที่ (จริง) {queueingConsumer.Delivery Delivery = consumer.nextDelivery (); ข้อความสตริง = สตริงใหม่ (deliver.getBody ()); system.out.println (hashCode + "[x] ได้รับ '" ข้อความ + "'); ทำ "); // channel.basicack (deliver.getenvelope (). getdeliverytag (), false); channel.basicack (deliver.getenvelope (). getdeliverytag (), false);}}/** * แต่ละจุดใช้เวลา 1S * * @param task.tochararray ()) {ถ้า (ch == '.') thread.sleep (1000);}}}สรุป
ข้างต้นเป็นคำอธิบายรายละเอียดทั้งหมดของรหัสคิวงาน Java ในบทความนี้ฉันหวังว่ามันจะเป็นประโยชน์กับทุกคน หากมีข้อบกพร่องใด ๆ โปรดฝากข้อความไว้เพื่อชี้ให้เห็น ขอบคุณเพื่อนที่ให้การสนับสนุนเว็บไซต์นี้!