Kami menulis tentang mengirim dan menerima pesan melalui antrian bernama. Jika Anda belum tahu, silakan klik: Memulai dengan RabbitMQ Java. Dalam artikel ini, kami akan membuat antrian kerja untuk mendistribusikan tugas yang memakan waktu di antara konsumen.
Tugas utama dari antrian kerja adalah untuk menghindari segera melakukan tugas-tugas intensif sumber daya dan kemudian harus menunggu mereka menyelesaikannya. Sebaliknya, kami melakukan penjadwalan tugas: kami merangkum tugas sebagai pesan dan mengirimkannya ke antrian. Pekerjaan dijalankan di latar belakang dan terus -menerus menghapus tugas dari antrian dan kemudian menjalankannya. Saat Anda menjalankan beberapa proses pekerja, tugas dalam antrian tugas akan dibagikan oleh proses pekerja.
Konsep seperti itu sangat berguna dalam aplikasi web ketika tugas kompleks harus dilakukan antara permintaan HTTP yang sangat singkat.
1. Persiapkan
Kami menggunakan thread.sleep untuk mensimulasikan tugas yang memakan waktu. Kami menambahkan sejumlah poin di akhir pesan yang dikirim ke antrian, setiap poin berarti butuh 1 detik di utas pekerja, seperti halo ... harus menunggu 3 detik.
Pengirim:
Newtask.java
Impor java.io.ioException; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; class public newTask {// quueue name final string statis final quleue_name = "workqueue"; {// Buat koneksi dan saluran ConnectionFactory factory = new connectionFactory (); factory.sethost ("localhost"); koneksi koneksi = factory.newconnection (); channel channel = connection.createChannel (); // menyatakan poin-poin. = 0; i <10; pesan + "'");} // tutup saluran dan sumber daya saluran.close (); connection.close ();}}Penerima:
Work.java
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Work{//Quote name private final static String QUEUE_NAME = "workqueue";public static void main(String[] argv) throws java.io.ioException, java.lang.interruptedException {// membedakan antara proses pekerja yang berbeda int hashcode = work.class.hashcode (); // Connection dan saluran koneksi Factory = connection connectionFactory (); factory.sethost ("localhost"); connection connection = new factore (); factory.sethost ("localhost"); connection connection = new factore (); factory. connection.createChannel();//Declare the queue channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(hashCode + " [*] Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);// Specify the consumption queue channel.basicconsume (queue_name, true, konsumen); while (true) {queueingConsumer.delivery Delivery = consumer.nextDelivery (); string message = new string (Delivery.getBody ()); System.out.println (hashCode + "[x] diterima '" + "' ' *" doWork (pesan); System.out.println (hashcode 1 "" DOWORK (pesan); System.out.println (hashcode + "HASHCODE +" [pesan); System); System. @param Task * @throws InterruptedException */private static void dowork (string Task) melempar interruptedException {for (char ch: task.tochararray ()) {if (ch == '.') Thread.sleep (1000);}}} Penerusan round-robin
Keuntungan menggunakan antrian tugas adalah bahwa mereka dapat bekerja secara paralel dengan mudah. Jika kami memiliki banyak backlog kerja, kami dapat menyelesaikan masalah dengan menambahkan lebih banyak pekerja, membuat sistem lebih dapat diukur.
Selanjutnya, pertama -tama kita akan menjalankan 3 pekerja (Work.java), dan kemudian menjalankan newtask.java. Tiga instance pekerja akan mendapatkan informasi. Tapi bagaimana cara mengalokasikan? Mari kita lihat hasil output:
[x] Sent 'helloworld.1'[x] Sent 'helloworld..2'[x] Sent 'helloworld...3'[x] Sent 'helloworld......4'[x] Sent 'helloworld......5'[x] Sent 'helloworld......6'[x] Sent 'helloworld.......7'[x] Sent 'helloworld.......8'[x] Sent 'helloworld.........9'[x] Sent 'HelloWorld .......... 10' Pekerja 1: 605645 [*] Menunggu pesan. Untuk keluar tekan ctrl+c605645 [x] menerima 'helloWorld.1'605645 [x] Done605645 [x] Menerima' HelloWorld .... 4'605645 [x] Done605645 [x] Diterima 'HelloWorld ......... 7'605645 [X] [X] [x] [X] Diterima [x] ... X. 7'605645 [X] [X] [x] [x] [x] Diterima ... 'HelloWorld .......... 10'605645 [x] Selesai Pekerja 2: 18019860 [*] Menunggu pesan. To exit press CTRL+C18019860 [x] Received 'helloworld..2'18019860 [x] Done18019860 [x] Received 'helloworld.....5'18019860 [x] Done18019860 [x] Received 'helloworld.........8'18019860 [x] Done Worker 3: 18019860 [*] Menunggu pesan. Untuk keluar tekan ctrl+c18019860 [x] menerima 'helloWorld ... 3'18019860 [x] Done18019860 [x] menerima' helloworld ...... 6'18019860 [x] Done18019860 [x] Diterima 'Helloworld ...... 9'. 9'1801980 [x] Diterima' Helloworld ...... 9'. 9'D1801980 [x] Diterima 'Helloworld ...... 9'. 9'D1801980 [x] Diterima' Helloworld ...... 9'.9'Der
Seperti yang Anda lihat, secara default, RabbitMQ akan mengirim informasi ke konsumen berikutnya satu per satu, terlepas dari durasi setiap tugas, dll., Dan itu adalah alokasi satu kali, bukan alokasi satu per satu. Rata -rata, setiap konsumen akan menerima informasi yang sama. Cara mendistribusikan pesan ini disebut round-robin.
2. MessageKeKnowledgments
Dibutuhkan beberapa detik untuk melakukan tugas. Anda mungkin khawatir tentang gangguan ketika seorang pekerja melakukan tugas. Dalam kode kami di atas, setelah RabbitMQ mengirimkan pesan kepada konsumen, itu akan segera menghapus informasi ini dari memori. Dalam hal ini, jika salah satu pekerja yang melakukan tugas terbunuh, kami akan kehilangan informasi yang sedang diproses. Kami juga akan kehilangan pesan yang telah diteruskan ke pekerja ini dan belum dieksekusi.
Dalam contoh di atas, pertama -tama kita memulai dua tugas, kemudian menjalankan kode yang mengirimkan tugas (newtask.java), dan kemudian segera tutup tugas kedua, dan hasilnya adalah:
Pekerja 2: 31054905 [*] WaitingFormessages. 18019860 [x] Menerima'Helloworld.1 '18019860 [x] Dilakukan 18019860 [x] Diterima' Helloworld ... 3 '18019860 [x] Dilakukan 18019860 [x] Diterima' Diterima ......... 5 '18019860 [x]' 'Done 18019 ... 6. 5' 18019860 [X] [x] 601980 ... x] ......... 5 '18019860 [x]' Done 18019 ... 7. x] [.. 5 '18019860 [x] Done 18019 ... 6080 ... x] [5' 18019860 [x] 'DO 18019. 7. llowld ......... 5' 18019860 [x] Done 18019 ... 6. x] [.. 5 '18019860 [x] Done 18019 ... 6. x] [.. 5' 18019860 [x] Done 18019 18019860 [x] selesai 18019860 [x] Diterima 'Helloworld ......... 9' 18019860 [x] selesai
Dapat dilihat bahwa pekerja kedua kehilangan setidaknya tugas 6, 8, dan 10, dan tugas 4 tidak selesai.
Namun, kami tidak ingin kehilangan tugas (informasi) apa pun. Ketika seorang pekerja (penerima) terbunuh, kami ingin memberikan tugas kepada pekerja lain.
Untuk memastikan bahwa pesan tidak akan pernah hilang, RabbitMQ mendukung pengakuan pesan. Konsumen mengirimkan balasan ke RabbitMQ, mengatakan bahwa informasi tersebut telah diterima dan diproses, dan kemudian RabbitMQ dapat dengan bebas menghapus informasi tersebut.
Jika konsumen terbunuh tanpa mengirim balasan, RabbitMQ akan berasumsi bahwa informasi tersebut belum diproses sepenuhnya dan akan diarahkan ke konsumen lain. Dengan cara ini, Anda dapat mengonfirmasi bahwa informasi tidak hilang, bahkan jika konsumen kadang -kadang terbunuh.
Mekanisme ini tidak berarti bahwa batas waktu tidak terjadi. RabbitMQ hanya menyalakan kembali informasi ini ketika koneksi konsumen terputus. Diizinkan jika konsumen membutuhkan waktu yang sangat lama untuk memproses pesan.
Balas pesan aktif secara default. Dalam kode di atas, kami mematikan mekanisme ini dengan mengatur autoask = true seperti yang ditunjukkan. Mari kita ubah kode (work.java):
boolean ack = false; // buka mekanisme respons. // Selain itu, Anda perlu mengirim balasan secara manual setelah setiap pemrosesan menyelesaikan pesan. channel.basicack (Delivery.getEnvelope (). getDeliveryTag (), false);
Pekerjaan yang sepenuhnya dimodifikasi.java
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Work{//Quote name private final static String QUEUE_NAME = "workqueue";public static void main(String[] argv) throws java.io.ioException, java.lang.interruptedException {// membedakan antara proses pekerja yang berbeda int hashcode = work.class.hashcode (); // Connection dan saluran koneksi Factory = connection connectionFactory (); factory.sethost ("localhost"); connection connection = new factore (); factory.sethost ("localhost"); connection connection = new factore (); factory. connection.createChannel();//Declare the queue channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(hashCode + " [*] Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);// Specify the consumption queue Boolean ack = false ; // buka saluran mekanisme respons.basicconsume (queue_name, ack, konsumen); while (true) {queueingConsumer.delivery Delivery = consumer.nextDelivery (); string message = new string (Delivery.getBody ()); System.out.println (hashCode + "[x] diterima '" + pesan + "'"); doWork (pesan); System.out.println (hashcode + "(pesan); channel.basicack (Delivery.getEnvelope (). getDeliveryTag (), false);}}}tes:
Kami mengubah jumlah pesan menjadi 5, lalu membuka dua konsumen (work.java), lalu mengirim tugas (newtask.java), segera tutup satu konsumen, dan amati output:
[x] terkirim'Helloworld..2 '[x] terkirim'helloRld ... 3' [x] terkirim'hellororld ... 4 '[x] terkirim'helloworld ...... 5' pekerja 18019860 [*] waitingFormessages.toExitpressctrl+C 1801986 [x] menerima'Helloworlorldororldorld+18019.1801980 [X] [x] menerima'Helloworlorldorld+18019.18019.dle [x] [x] Diterima. 18019860 [x] Diterima'Hellorld .... 4 'Worker1 31054905 [*] tunggu tunggu. 31054905 [x] Diterima 'Helloworld .... 5' 31054905 [x] Dilakukan 31054905 [x] Diterima 'Helloworld .... 4' 31054905 [x] Selesai
Anda dapat melihat bahwa Tugas 4 yang tidak diselesaikan oleh Pekerja 2 dipicu kembali ke Pekerja 1 untuk penyelesaian.
3. Daya Daya Berbahasa
Kami telah belajar bahwa bahkan jika konsumen terbunuh, pesannya tidak akan hilang. Tetapi jika layanan RabbitMQ dihentikan saat ini, pesan kami masih akan hilang.
Ketika RabbitMQ keluar atau keluar secara tidak normal, semua antrian dan informasi akan hilang kecuali Anda mengatakannya untuk tidak kehilangannya. Kita perlu melakukan dua hal untuk memastikan bahwa informasi tidak hilang: kita perlu menetapkan bendera persisten untuk semua antrian dan pesan.
Pertama, kita perlu mengkonfirmasi bahwa RabbitMQ tidak akan pernah kehilangan antrian kita. Untuk melakukan ini, kita perlu menyatakannya sebagai gigih.
booleandable = true;
channel.queuedeclare ("Task_queue", tahan lama, salah, salah, null);
Catatan: RabbitMQ tidak mengizinkan mendefinisikan ulang antrian dengan parameter yang berbeda, jadi kami tidak dapat memodifikasi atribut antrian yang sudah ada.
Kedua, kita perlu mengidentifikasi informasi kita sebagai gigih. Atur nilai MessageProPerties (implementBasicProperties) ke persistent_text_plain.
channel.basicpublish ("", "Task_queue", MessageProPerties.Persistent_text_plain, message.getbytes ());
Sekarang Anda dapat menjalankan program yang mengirim pesan, lalu tutup layanan, memulai kembali layanan, dan menjalankan program konsumen untuk melakukan percobaan.
4. Fairdispatch
Mungkin kita akan menemukan bahwa mekanisme penerusan pesan saat ini (round-robin) bukanlah yang kita inginkan. Misalnya, dalam hal ini, untuk dua konsumen, ada serangkaian tugas, tugas aneh sangat memakan waktu, sementara bahkan tugas itu mudah, yang menyebabkan satu konsumen sibuk sementara konsumen lain untuk dengan cepat menyelesaikan tugas dan menunggu setelah selesai.
Alasan untuk ini adalah bahwa RabbitMQ hanya meneruskan pesan ketika pesan tiba di antrian. Tidak peduli berapa banyak tugas yang tidak dikeluarkan konsumen untuk membalas RabbitMQ. Hanya meneruskan semua angka ganjil ke satu konsumen dan bahkan angka ke konsumen lain.
Untuk mengatasi masalah ini, kita dapat menggunakan metode BasicQOS, meneruskan parameter sebagai prefetchCount = 1. Ini memberi tahu RabbitMQ untuk tidak memberikan lebih dari satu pesan kepada konsumen secara bersamaan. Dengan kata lain, pesan berikutnya akan dikirim hanya ketika konsumen menganggur.
int prefetchcount = 1; channel.basicqos (prefetchcount);
Catatan: Jika semua pekerja sibuk, antrian Anda dapat diisi. Anda mungkin mengamati penggunaan antrian dan kemudian menambahkan pekerja, atau menggunakan beberapa strategi lainnya.
Tes: Ubah kode untuk mengirim pesan, ubah jumlah akhir poin menjadi 6-2, lalu mulai kedua pekerja terlebih dahulu, dan kemudian kirim pesan:
[x] Mengirim 'HelloWorld ...... 6' [x] Mengirim 'HelloWorld ..... 5' [x] mengirim 'HelloWorld .... 4' [x] mengirim 'helloworld ... 3' [x] mengirim 'helloworld..2' pekerja 1: 18019860 [*] menunggu pesan. Untuk keluar dari tekan Ctrl+C18019860 [x] menerima 'HelloWorld ...... 6'18019860 [x] Done18019860 [x] Menerima' HelloWorld ... 3'18019860 [x] Selesai Pekerja 2: 31054905 [*] Menunggu pesan. Untuk keluar tekan CTRL+C31054905 [x] menerima 'HelloWorld ...... 5'31054905 [x] Done31054905 [x] Menerima' HelloWorld ...... 4'31054905 [x] Selesai
Dapat dilihat bahwa pesan itu tidak diteruskan sesuai dengan mekanisme round-robin sebelumnya saat ini, tetapi diteruskan ketika konsumen tidak sibuk. Selain itu, di bawah model ini, konsumen didukung untuk meningkat secara dinamis karena pesan tidak dikirim, dan peningkatan dinamis meningkat segera. Mekanisme penerusan default akan menyebabkan, bahkan jika konsumen ditambahkan secara dinamis, pesan telah dialokasikan dan tidak dapat ditambahkan segera, bahkan jika ada banyak tugas yang belum selesai.
5. Kode Lengkap
Newtask.java
Impor java.io.ioException; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; import com.rabbitmq.client.messageProPerties; Kelas publik Newtask {//client namee = queeeecy; "Workqueue_persistence"; public static void main (string [] args) melempar ioException {// membuat koneksi dan saluran saluran Connection Factory = new connectionFactory (); factory.sethost ("localhost"); connection connection = factory.newconnection (); channel channel = connection. Antrian Persistence Channel.QueuedEclare (Queue_name, tahan lama, false, false, null); // Kirim 10 pesan, Menambahkan 1-10 poin setelah pesan pada gilirannya untuk (int i = 5; i> 0; i--) {string dots = ""; untuk (int j = 0; j <= i; i; i; i- dots +"." Dots.length (); // MessageProPerties 2. Setel pesan persistence channel.basicpublish ("", queue_name, messageProperties.persistent_text_plain, message.getbytes ()); System.out.println ("[x] dikirim '" + pesan + ""); channel.close (); connection.close ();}}Work.java
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;public class Work{// Queue name private final static String QUEUE_NAME = "workqueue_persistence";public static void main(String[] argv) melempar java.io.ioException, java.lang.interruptedException {// membedakan output dari berbagai proses pekerja int hashcode = work.class.hashcode (); // Buat koneksi dan saluran Connection factory = connectionFactory (); factory.sethost ("localhost localhost"); localhost "; connection.createChannel (); // menyatakan antrian boolean tahan lama = true; channel.queuedeclare (queue_name, tahan lama, false, false, null); System.out.println (hashcode + "[*] Menunggu pesan. Untuk keluar dari pers ctrl + c"); // Atur angka maksimum dari Maksimum Layanan ke depan. 1; channel.basicqos (prefetchcount); queueingconsumer konsumen = queueingconsumer baru (saluran); // Tentukan konsumsi antrian boolean ack = false; // nyalakan mekanisme respons. while (true) {queueingConsumer.delivery Delivery = consumer.nextDelivery (); string message = new string (Delivery.getBody ()); System.out.println (hashcode + "[x] diterima '" + pesan + "'"); doWork (pesan); System.out.println (hashcode + "[pesan); Selesai "); // channel.basicack (Delivery.getEnvelope (). GetDeliveryTag (), false); channel.basicack (Delivery.getEnvelope (). GetDeliveryTag (), false);}}/** * setiap poin memakan 1s * @param Tugas * @throws InterruprException oPoID ** Poin * @Param) @Throws InterruprException {Taspeprion Everseat * (char ch: task.tochararray ()) {if (ch == '.') thread.sleep (1000);}}}Meringkaskan
Di atas adalah semua penjelasan terperinci tentang kode antrian kerja Java dalam artikel ini, saya harap ini akan membantu semua orang. Jika ada kekurangan, silakan tinggalkan pesan untuk menunjukkannya. Terima kasih teman atas dukungan Anda untuk situs ini!