名前付きキューを介してメッセージの送信と受信について書きました。まだわからない場合は、クリックしてください:Rabbitmq Javaを始めましょう。この記事では、消費者間で時間のかかるタスクを配布するための作業キューを作成します。
作業キューの主なタスクは、すぐにリソース集約型のタスクを実行しないようにし、それらが完了するのを待たなければならないことです。代わりに、タスクのスケジューリングを実行します。メッセージをメッセージとしてカプセル化し、キューに送信します。作業はバックグラウンドで実行され、キューからタスクを常に削除してから実行します。複数のワーカープロセスを実行すると、タスクキューのタスクはワーカープロセスによって共有されます。
このような概念は、非常に短いHTTP要求の間に複雑なタスクを実行する必要がある場合、Webアプリケーションで非常に役立ちます。
1。準備します
Thread.sleepを使用して、時間のかかるタスクをシミュレートします。キューに送信されたメッセージの最後に特定の数のポイントを追加します。各ポイントは、Helloなど、ワーカースレッドで1秒かかることを意味します... 3秒待つ必要があります。
送信者:
newtask.java
java.io.ioexception; Import com.rabbitmq.client.channel;インポートcom.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory; public class newtask {// queue name name private final static string queue_name = "workie" Channel ConnectionFactory = new ConnectionFactory(); factory.sethost( "localhost"); connection connection = factory.newconnection(); channel = connection.createchannel(); // Queue_name、fals、false、false、null); i ++){string dots = ""; for(int j = 0; j <= i; j ++){dots+= "。";} string message = "helloworld"+dots+dots.length(); channel.basicpublish( ""、queue_name、null、message.getBytes( "'");} // closeチャネルとリソースチャンネル.close(); connection.close();}}受信者:
work.java
com.rabbitmq.client.channelをインポート;インポートcom.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; Import com.rabbitmq.client.queueingconsumer;パブリッククラスの作業{// Quote Name name name final static string queue_namam java.io.ioexception、java.lang.interruptedexception {//異なるワーカープロセスを区別するint hashcode = work.class.hashcode(); //接続とチャネルConnectionFactory Factory = new ConnectionFactory(); factory.sethost( "localhost"); connection connection = connection.newconnectory.newconnection queue channel.queuedeclare(queue_name、false、false、false、null); system.out.println(hashcode + "[*]メッセージを待っている。ctrl + c"); queueingconsumer consumer = new QueueingConsumer(チャンネル); while(true){queueingconsumer.delivery deliviry = consumer.nextdelivery(); string message = new String(delivery.getbody()); out.println(hashcode + "[x] receive" " + message +" '"); dowork(message); system.out.out.println(hashcode +" done "); @param task * @throws arturnedexception */private static void dowork(string task)throws interruptedexception {for(char ch:task.tochararray()){if(ch == '。')swree(1000);}}}}}}}}ラウンドロビン転送
タスクキューを使用する利点は、簡単に並行して作業できることです。多くの作業バックログがある場合、より多くの労働者を追加して、システムをよりスケーラブルにすることで問題を解決できます。
次に、最初に3人の労働者(work.java)インスタンスを実行し、次にnewtask.javaを実行します。 3人の労働者インスタンスが情報を取得します。しかし、どのように割り当てますか?出力の結果を見てみましょう。
[x] sent 'helloworld.1' [x] send 'helloworld..2' [x] sent 'helloworld ... 'Helloworld .......... 10'労働者1:605645 [*]メッセージを待っています。 Ctrl+C605645 [x]を出るには、 'Helloworld.1'605645 [x] done605645 [x] sueed' helloworld .... 4'605645 [x] don605645 [x] sueed 'helloworld ......... 7'605645 [x] done605645 [x]を受信しました。 'Helloworld .......... 10'605645 [x] done worker 2:18019860 [*]メッセージを待っています。 ctrl+c18019860 [x]を出るには、 'helloworld..2'18019860 [x] done18019860 [x]を受信しました。 [*]メッセージを待っています。 ctrl+c18019860 [x]を出るには、「helloworld ... 3'18019860 [x] done18019860 [x]を受信しました。
ご覧のとおり、デフォルトでは、RabbitMQは、各タスクの期間などに関係なく、次の消費者に情報を1つずつ送信します。これは、1つの1つの割り当てではなく、1回限りの割り当てです。平均して、各消費者は同量の情報を受け取ります。メッセージを配布するこの方法は、Round-Robinと呼ばれます。
2。MessageAcKnowledgments
タスクを実行するには数秒かかります。労働者がタスクを実行するとき、あなたは中断を心配するかもしれません。上記のコードでは、RabbitMQが消費者にメッセージを配信すると、この情報をすぐにメモリから削除します。この場合、タスクを実行している労働者の1人が殺された場合、処理されている情報が失われます。また、この労働者に転送されたメッセージを失い、まだ実行されていません。
上記の例では、最初に2つのタスクを開始し、次にタスクを送信するコード(newtask.java)を実行し、すぐに2番目のタスクを閉じます。結果は次のとおりです。
ワーカー2:31054905 [*] waiteformessages.toexitpressctrl+c 31054905 [x] receive'hhelloworld..2 '31054905 [x] done 31054905 [x] receed'helloworld .... 4'ワーカー1:18019860 18019860 [x]受信'helloworld.1 '18019860 [x] done 18019860 [x] receive'hhellowld ... 3' 18019860 [x] done 18019860 [x] receed'helloworld ......... 5 '18019860 [x] 18019860 [x] done 18019860 [x]受信'helloworld ......... 9 '18019860 [x] done
2番目の労働者が少なくとも6、8、および10を失い、タスク4が完了していないことがわかります。
ただし、タスク(情報)を失いたくありません。労働者(受信者)が殺されたら、私たちはタスクを別の労働者に渡したいと思っています。
メッセージが決して失われないようにするために、RabbitMQはメッセージの謝辞をサポートします。消費者はRabbitmqに返信を送信し、情報が受信および処理されたことを伝え、RabbitMQは情報を自由に削除できます。
返信を送信せずに消費者が殺された場合、RabbitMQは情報が完全に処理されておらず、他の消費者にリダイレクトされると想定します。このようにして、消費者が時々殺されたとしても、情報が失われないことを確認できます。
このメカニズムは、タイムアウトが当てはまらないことを意味しません。 rabbitmqは、消費者接続が切断されている場合にのみこの情報を再投稿します。消費者がメッセージの処理に特に時間がかかる場合は許可されます。
メッセージ返信はデフォルトでオンになります。上記のコードでは、示されているようにAutoask = trueを設定することにより、このメカニズムをオフにします。コードを変更しましょう(work.java):
boolean ack = false; //応答メカニズムChannel.BasicConsume(Queue_Name、Ack、Consumer)を開きます。 //さらに、各処理がメッセージを完了した後、手動で返信する必要があります。 channel.basicack(delivery.getEnvelope()。getDeliverytag()、false);
完全に変更されたwork.java
com.rabbitmq.client.channelをインポート;インポートcom.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; Import com.rabbitmq.client.queueingconsumer;パブリッククラスの作業{// Quote Name name name final static string queue_namam java.io.ioexception、java.lang.interruptedexception {//異なるワーカープロセスを区別するint hashcode = work.class.hashcode(); //接続とチャネルConnectionFactory Factory = new ConnectionFactory(); factory.sethost( "localhost"); connection connection = connection.newconnectory.newconnection queue channel.queuedeclare(queue_name、false、false、false、null); system.out.println(hashcode + "[*]メッセージを待機します。 channel.basicconsume(queue_name、ack、consumer); while(true){queueingconsumer.delivery deliviry = consumer.nextdelivery(); string message = new String(delivery.getBody()); out.println(hashcode + "[x] receive '" + message + "'"); dowork(message); system.out.out.println(hashcode + "done"; Channel.basicack(delivery.getEnvelope()。getDeliverytag()、false);}}}}}テスト:
メッセージの数を5に変更し、2人の消費者(work.java)を開き、タスク(newtask.java)を送信し、すぐに1人の消費者を閉じて、出力を観察します。
[x] sent'helloworld..2 '[x] sent'helloworld ... 3' [x] sent'helloworld ... 4 '[x] sent'hhellowld ...... 5' worker2 18019860 [*] waiteformess.toexitpressctrl+c 18019860 [x]受信18019860 [x]受信'helloworld .... 4 'worker1 31054905 [*] waiteformessages.toexitpressctrl+c 31054905 [x]受信31054905 [x] done 31054905 [x]受信... 3' 3104905 [x] 31054905 [x]受信'helloworld .... 5 '31054905 [x] done 31054905 [x] receive'helloworld .... 4' 31054905 [x] done
労働者2によって完了しなかったタスク4が、完了のために労働者1に再フォアリングされていることがわかります。
3。メッセージ付き耐久性
消費者が殺されたとしても、メッセージは失われないことを学びました。しかし、この時点でRabbitMQサービスが停止した場合、メッセージはまだ失われます。
rabbitmqが異常に終了または終了すると、すべてのキューと情報が失われないようにしない限り、情報は失われます。情報が失われないようにするには、2つのことを行う必要があります。すべてのキューとメッセージに永続的なフラグを設定する必要があります。
まず、rabbitmqがキューを失うことは決してないことを確認する必要があります。これを行うには、それを永続的であると宣言する必要があります。
booleandurable = true;
Channel.QueueDeclare( "task_queue"、耐久性、false、false、null);
注:RabbitMQは、異なるパラメーターでキューを再定義することを許可していないため、すでに存在するキューの属性を変更することはできません。
第二に、情報を永続的であると特定する必要があります。 messageProperties(emplentionBasicProperties)値をpersistent_text_plainに設定します。
Channel.BasicPublish( "" "、" task_queue "、messageproperties.persist_plain、message.getBytes());
これで、メッセージを送信するプログラムを実行し、サービスを閉じてサービスを再起動し、コンシューマプログラムを実行して実験を行うことができます。
4。フェアディスパッチ
おそらく、現在のメッセージ転送メカニズム(ラウンドロビン)は私たちが望むものではないことがわかります。たとえば、この場合、2人の消費者の場合、一連のタスクがあり、奇妙なタスクは特に時間がかかりますが、タスクでさえ簡単です。
その理由は、rabbitmqがメッセージがキューに到着したときにのみメッセージを転送するからです。消費者がRabbitmqへの返信を提供していないタスクの数を気にしないでください。あるすべての奇数をある消費者に盲目的に転送し、偶数を別の消費者に偶然に転送します。
この問題を解決するために、basicqosメソッドを使用して、パラメーターをprefetchcount = 1として渡すことができます。これにより、Rabbitmqは、消費者に複数のメッセージを同時に提供しないように指示します。言い換えれば、次のメッセージは、消費者がアイドル状態の場合にのみ送信されます。
int prefetchcount = 1; channel.basicqos(prefetchcount);
注:すべての労働者が忙しい場合、キューが満たされる可能性があります。キューの使用法を観察してから労働者を追加するか、他の戦略を使用する場合があります。
テスト:コードを変更してメッセージを送信し、ポイントの終了数を6-2に変更してから、最初に2人のワーカーを開始してから、メッセージを送信します。
[x] sent 'helloworld ...... 6' [x] send 'helloworld ..... 5' [x] sent 'helloworld .... 4' [x] sent 'helloworld ... ctrl+c18019860 [x]を出るには、「helloworld ...... 6'18019860 [x] done18019860 [x]を受信しました。 ctrl+c31054905 [x]を出るには、 'helloworld ...... 5'31054905 [x] done31054905 [x]を受信しました。
この時点での以前のラウンドロビンメカニズムに従ってメッセージが転送されなかったが、消費者が忙しくないときに転送されたことがわかります。さらに、このモデルでは、メッセージが送信されず、動的増加がすぐに増加するため、消費者は動的に増加するようにサポートされます。デフォルトの転送メカニズムは、消費者が動的に追加されたとしても、メッセージが割り当てられており、多くの未完成のタスクがあっても、すぐに追加することはできません。
5。完全なコード
newtask.java
java.io.ioexception;インポートcom.rabbitmq.client.channel;インポートcom.rabbitmq.client.connection; import com.rabbitmq.client.connectionFactory;インポートcom.rabbitmq.client.client.messageproperties; public class newtask {// queue name final static string "; static void main(string [] args)throws ioexception {// Chreate connection and Channel connectionfactory factory = new ConnectionFactory(); factory.sethost( "localhost"); connection connection = factory.newconnection(); channel = connection.createchannel(); Channel.QueueDeclare(queue_name、durable、false、false、null); // 10のメッセージを送信します。10個のメッセージを送信し、1-10ポイント後に1-10ポイントを送信します。 MessageProperties2。SetMessage Persistence Channel.basicPublish( ""、queue_name、messageproperties.persistent_text_plain、message.getBytes();work.java
com.rabbitmq.client.channelをインポート;インポートcom.rabbitmq.client.connection; Import com.rabbitmq.client.connectionFactory; Import com.rabbitmq.client.queueingConsumer;パブリッククラスの作業{// Queue name name name final sating queue_namam java.io.ioexception、java.lang.interruptedexception {//異なるワーカープロセスの出力を区別するint hashcode = work.class.hashcode(); // connection and connectionfactory factory = new connectionFactory(); factory.sethost( "localhost"); connection = connection = factory.newconnectrid();キューブールの耐久性= true; channel.queuedeclare(queue_name、durable、false、false、null); system.out.println(hashcode + "[*]メッセージを待っている。 Consumer = new QueueIngConsumer(Channel); //消費キューを指定します。 while(true){queueingconsumer.delivery deliviry = consumer.nextdelivery(); string message = new string(delivery.getbody()); out.println(hashcode + "[x] receive '" + message + "'"); dowork(message); system.out.println(hashcode + "] done "); // channel.basicack(delivery.getEnvelope()。getDeliverytag()、false); channel.basicack(delivery.getEnvelope()。getDeliverytag()、false);}/** *各ポイントは1s * * @paramタスク */private viod dowork(string dowork(string dowork)を取ります。 task.tochararray()){if(ch == '。')swree.sleep(1000);}}}}}}要約します
上記は、この記事のJavaワークキューコードのすべての詳細な説明です。私はそれがすべての人に役立つことを願っています。欠点がある場合は、それを指摘するためにメッセージを残してください。このサイトへのご支援をありがとうございました!