우리는 이름이 지정된 큐를 통해 메시지를 보내고받는 것에 대해 썼습니다. 아직 모르는 경우 클릭하십시오 : Rabbitmq Java를 시작하십시오. 이 기사에서는 소비자에게 시간이 많이 걸리는 작업을 배포하는 작업 대기열을 만들 것입니다.
작업 대기열의 주요 작업은 즉시 리소스 집약적 인 작업을 즉시 실행하는 것을 피한 다음 완료되기까지 기다려야합니다. 대신 작업 스케줄링을 수행합니다. 작업을 메시지로 캡슐화하여 대기열로 보냅니다. 작업은 백그라운드에서 실행되며 큐에서 지속적으로 작업을 제거한 다음 실행합니다. 여러 작업자 프로세스를 실행하면 작업 대기열의 작업이 작업자 프로세스에서 공유됩니다.
이러한 개념은 매우 짧은 HTTP 요청 사이에 복잡한 작업을 수행 해야하는 경우 웹 응용 프로그램에서 매우 유용합니다.
1. 준비
우리는 시간이 많이 걸리는 작업을 시뮬레이션하기 위해 Thread.Sleep을 사용합니다. 우리는 큐로 전송 된 메시지 끝에 특정 포인트를 추가합니다. 각 점은 Hello와 같은 작업자 스레드에서 1 초가 걸립니다 ... 3 초를 기다려야합니다.
보내는 사람:
Newtask.java
import java.io.ioexception; import com.rabbitmq.client.Channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; public class newtask {// Queue 이름 사립 정적 정적 정적 void main (string) iooircection (strows) iooirclection {arge void incless and are queue newtask {// Queue 이름 ConnectionFactory factory = new ConnectionFactory (); factory.sethost ( "localHost"); Connection Connection = factory.NewConnection (); Channel Channel = Connection.CreateChannel (); // 큐 channel.queuedeClare (queue_name, False, False, NULL); // chefence 1-10 포인트 (int i = 0); {string dots = ""; for (int j = 0; j <= i; j ++) {dots += ". " '");} // 채널 및 자원 채널을 닫습니다 .Close (); connection.close ();}}수화기:
work.java
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.queueingConsumer; public class Work {// QUOTE 이름 개인 정적 문자열 queue_name = "Workqueue"; public void main (string []). java.io.ioexception, java.lang.interruptedexception {// 다른 작업자 프로세스를 구별하는 int hashcode = work.class.hashcode (); hashcode (// 연결 및 채널 연결 공장 = new ConnectionFactory (); factory.sethost.sethost ( "localHost"); channel = connection (); 대기열 채널 .queuedeClare (queue_name, false, false, false, null); system.out.println (Hashcode + "[*] 메시지 대기 중. ctrl + c"); QueueingConsumer 소비자 = 새로운 대기열 큐어 (채널); // specifify the concecor (specifice the concecor) (queueingconsumer); while (true) {queueingconsumer.delivery delivery = consumer.nextDelivery (); String Message = new String (delivery.getBody ()); system.out.println (hashcode + "[x] 수신 '" + 메시지 + "'"); dowork (message); system.out.println (hashcode + " * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * @param task * @Throws InterruptedException */private static void dowork (문자열 작업)은 InterruptedException {for (char ch : task.tochararray ()) {if (ch == '.') stread (1000);}}} 라운드 로빈 전달
작업 대기열을 사용하는 장점은 쉽게 병렬로 작동 할 수 있다는 것입니다. 작업 백 로그가 많으면 더 많은 작업자를 추가하여 시스템을보다 확장 가능하게하여 문제를 해결할 수 있습니다.
다음으로 먼저 3 명의 작업자 (Work.java) 인스턴스를 실행 한 다음 Newtask.java를 실행합니다. 세 명의 근로자 사례는 정보를 얻을 것입니다. 하지만 할당하는 방법? 출력 결과를 살펴 보겠습니다.
[x]는 'Helloworld.1'[x]를 보냈다. 'Helloworld ......... 9'[x]는 'HelloWorld .......... 10'Worker 1 : 605645 [*] 메시지를 기다렸다. Ctrl+c605645 [x]를 퇴치하려면 'Helloworld.1'605645 [x] done605645 [x]를 받았습니다. 'Helloworld .......... 10'605645 [x] 완료된 작업자 2 : 18019860 [*] 메시지 대기. Ctrl+C18019860 [x]를 퇴치하려면 'Helloworld..2'18019860 [x] done18019860 [x]를 받았습니다. 18019860 [*] 메시지를 기다리고 있습니다. Ctrl+C18019860 [x]를 출시하려면 'Helloworld ... 3'18019860 [x] done18019860 [x]를 받았습니다.
보시다시피, 기본적으로 RabbitMQ는 각 작업의 지속 시간에 관계없이 다음 소비자에게 하나씩 정보를 보내며, 일대일 할당이 아니라 일회성 할당입니다. 평균적으로 각 소비자는 동일한 양의 정보를받습니다. 메시지를 배포하는이 방법을 라운드 로빈이라고합니다.
2. MessageAcknowledgments
작업을 수행하는 데 몇 초가 걸립니다. 작업자가 작업을 수행 할 때 중단이 걱정 될 수 있습니다. 위의 코드에서 RabbitMQ가 소비자에게 메시지를 전달하면 즉시 메모리 에서이 정보를 제거합니다. 이 경우 작업을 수행하는 근로자 중 한 명이 사망하면 처리중인 정보를 잃게됩니다. 우리는 또한이 작업자에게 전달 된 메시지를 잃을 것이며 아직 실행되지 않았습니다.
위의 예에서는 먼저 두 가지 작업을 시작한 다음 작업 (newtask.java)을 보내는 코드를 실행 한 다음 즉시 두 번째 작업을 닫으면 결과가 다음과 같습니다.
작업자 2 : 31054905 [*] WaitingFormessages.ToExitPressCtrl+C 31054905 [X] 수신 된 'HelloWorld..2'31054905 [X] 완료 31054905 [X] Hearly'HellowOrld .... 4 '작업자 1 : 18019860 [*] 대기 성능 18019860 [x] 수신 된 'helloworld.1'18019860 [x] 완료 18019860 [x] 수신 된 'helloworld ... 3'18019860 [x] 완료 18019860 [x] 수신 된 'helloworld ......... 5'18019860 [x] 18019860 [x] 18019860 [x]. 18019860 [x] 완료 18019860 [x] 수신 된 'helloworld ......... 9'18019860 [x] 완료
두 번째 근로자는 최소한 과제 6, 8 및 10을 잃었고 작업 4가 완료되지 않았다는 것을 알 수 있습니다.
그러나 우리는 작업 (정보)을 잃고 싶지 않습니다. 근로자 (수령인)가 살해되면 다른 근로자에게 임무를 전달하고 싶습니다.
메시지가 손실되지 않도록하기 위해 RabbitMQ는 메시지 승인을 지원합니다. 소비자는 RabbitMQ에 답장을 보내 정보가 수신 및 처리되었다고 말한 다음 RabbitMQ가 정보를 자유롭게 삭제할 수 있습니다.
소비자가 답장을 보내지 않고 살해 된 경우 RabbitMQ는 정보가 완전히 처리되지 않았으며 다른 소비자에게 리디렉션 될 것이라고 가정합니다. 이런 식으로 소비자가 때때로 살해 되더라도 정보가 손실되지 않았 음을 확인할 수 있습니다.
이 메커니즘은 타임 아웃이 사실이 아니라는 것을 의미하지는 않습니다. RabbitMQ는 소비자 연결이 연결이 끊어 질 때만이 정보를 다시 반환합니다. 소비자가 메시지를 처리하는 데 특히 오랜 시간이 걸리면 허용됩니다.
메시지 응답은 기본적으로 켜져 있습니다. 위의 코드에서는 표시된대로 Autoask = True를 설정 하여이 메커니즘을 끕니다. 코드를 수정하겠습니다 (work.java) :
부울 ACK = 거짓; // 응답 메커니즘을 엽니 다. BasicConsume (queue_name, ack, consumer); // 또한 각 처리가 메시지를 완료 한 후에는 회신을 수동으로 보내야합니다. channel.basicack (delatiption.getenvelope (). getDeliveryTag (), false);
완전히 수정 된 작업
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.queueingConsumer; public class Work {// QUOTE 이름 개인 정적 문자열 queue_name = "Workqueue"; public void main (string []). java.io.ioexception, java.lang.interruptedexception {// 다른 작업자 프로세스를 구별하는 int hashcode = work.class.hashcode (); hashcode (// 연결 및 채널 연결 공장 = new ConnectionFactory (); factory.sethost.sethost ( "localHost"); channel = connection (); 대기열 채널 .queuedeClare (queue_name, false, false, false, null); system.out.println (Hashcode + "[*] 메시지 대기 중. ctrl + c를 누르기 위해 기다리기 위해. channel.basicConsume (queue_name, ACK, 소비자); while (true) {queueingconsumer.delivery delivery = consumer.nextDelivery (); string message = new String (delivery.getBody ()); System.out.println (hashcode + "[x] 수신 '" + 메시지 + "'"); Dowork (메시지); System.out.println (hashcode + "done"); channel.basicack (delatiption.getenvelope (). getDeliveryTag (), false;}}}시험:
우리는 메시지 수를 5로 변경 한 다음 두 소비자 (Work.java)를 열린 다음 작업 (newtask.java)을 보내고 즉시 소비자 1 명을 닫고 출력을 관찰합니다.
[x] sent'helloworld..2 '[x] sent'helloworld ... 3'[x] sent'helloworld ... 4 '[x] sent'helloworld ...... 5'worker2 18019860 [*] WaitingFormessages.toexitpressctrl+c 18019860 [x] 수신 된 'helloworld..2'18019860 [x] 18019860 [x] 수신 된 'helloworld .... 4'worker1 31054905 [*] 대기 성분 31054905 [x] 수신 된 'helloworld .... 5'31054905 [x] 완료 31054905 [x] 수신 된 'helloworld .... 4'31054905 [x] 완료
작업자 2에 의해 완료되지 않은 작업 4가 완료를 위해 작업자 1로 다시 향하고 있음을 알 수 있습니다.
3. 내구성이 내린 메시지
우리는 소비자가 살해 되더라도 메시지가 손실되지 않을 것임을 배웠습니다. 그러나 현재 RabbitMQ 서비스가 중단되면 메시지가 여전히 손실됩니다.
RabbitMQ가 비정상적으로 종료되거나 빠져 나가면, 잃지 말라고 말하지 않으면 모든 대기열과 정보가 손실됩니다. 정보가 손실되지 않도록 두 가지 작업을 수행해야합니다. 모든 대기열과 메시지에 대한 지속적인 플래그를 설정해야합니다.
먼저, 우리는 RabbitMQ가 우리의 줄을 잃지 않을 것임을 확인해야합니다. 이를 위해서는이를 지속적으로 선언해야합니다.
부울 가능 = 참;
channel.queuedeClare ( "task_queue", 내구성, 거짓, 거짓, null);
참고 : RabbitMQ는 다른 매개 변수로 큐를 재정의 할 수 없으므로 이미 존재하는 큐의 속성을 수정할 수 없습니다.
둘째, 우리는 정보를 영구적으로 식별해야합니다. MessageProperties (emplementsBasicProperties) 값을 persistent_text_plain으로 설정하십시오.
channel.basicpublish ( "", "task_queue", messageProperties.Persistent_text_plain, message.getBytes ());
이제 메시지를 보내는 프로그램을 실행 한 다음 서비스를 닫고 서비스를 다시 시작한 후 소비자 프로그램을 실행하여 실험을 수행 할 수 있습니다.
4. Fairdispatch
아마도 우리는 현재 메시지 전달 메커니즘 (라운드 로빈)이 우리가 원하는 것이 아니라는 것을 알게 될 것입니다. 예를 들어,이 경우 두 소비자의 경우 일련의 작업이 있으며, 홀수 작업은 특히 시간이 많이 걸리는 반면, 작업은 쉽고 작업은 쉽기 때문에 한 소비자는 바쁘게 진행되는 반면 다른 소비자는 작업을 신속하게 완료하고 완료 후 대기합니다.
그 이유는 RabbitMQ가 메시지가 큐에 도착했을 때만 메시지를 전달하기 때문입니다. 소비자가 RabbitMQ에 답장을 전달하지 않은 작업의 수는 신경 쓰지 마십시오. 모든 홀수를 한 소비자에게 맹목적으로 전달하고 다른 소비자에게 숫자를 전달하십시오.
이 문제를 해결하기 위해 BasicQOS 메소드를 사용하여 매개 변수를 PrefetchCount = 1으로 전달할 수 있습니다. 이것은 RabbitMQ에게 소비자에게 동시에 하나 이상의 메시지를 제공하지 말라고 지시합니다. 다시 말해, 다음 메시지는 소비자가 유휴 상태 일 때만 전송됩니다.
int prefetchCount = 1; Channel.basicQos (prefetchCount);
참고 : 모든 근로자가 바쁘면 대기열이 채워질 수 있습니다. 대기열 사용을 관찰 한 다음 작업자를 추가하거나 다른 전략을 사용할 수 있습니다.
테스트 : 코드를 변경하여 메시지를 보내고, 끝 포인트 수를 6-2로 변경 한 다음 먼저 두 작업자를 시작한 다음 메시지를 보냅니다.
[x]는 'Helloworld ...... 6'[x]를 보냈다 'Helloworld ..... 5'[x] 보낸 'Helloworld .... 4'[x] 보낸 'Helloworld ... 3'[x] 보낸 'Helloworld..2'Worker 1 : 18019860 [*] 메시지를 기다렸다. Ctrl+C18019860 [x]를 출시하려면 'Helloworld ...... 6'18019860 [x] done18019860 [x]를 받았습니다. Ctrl+C31054905 [X]를 퇴치하려면 'HelloWorld ...... 5'31054905 [x] done31054905 [x]를 받았습니다'Helloorld ...... 4'31054905 [x] Done Done
현재 이전 라운드 로빈 메커니즘에 따라 메시지가 전달되지 않았지만 소비자가 바쁘지 않을 때 전달되었습니다. 또한이 모델에서는 메시지가 전송되지 않아서 동적 인 증가가 즉시 증가하기 때문에 소비자는 동적으로 증가하도록 지원됩니다. 소비자가 동적으로 추가 되더라도 기본 전달 메커니즘은 메시지가 할당되었으며 미완성 된 작업이 많이 있더라도 즉시 추가 할 수 없습니다.
5. 코드를 완료하십시오
Newtask.java
import java.io.ioexception; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.messageProperties; public newtask {// queue queue queue queue_name "; 정적 void main (string [] args)은 ioexception을 던집니다 {// 연결 및 채널 ConnectionFactory 팩토리 = New ConnectionFactory (); Factory.Sethost ( "localHost"); Connection Connection = Factory.NewConnection (); Connection.CreateChannel (); // queue boolean Durable = true; // 1. queue PersitionSence를 선언합니다. channel.queuedeClare (queue_name, 내구성, 거짓, 거짓, 거짓, null); // 10 개의 메시지를 보내고, (int i = 5; i> 0; i--) {string dots = ""; for (j = 0; j <= i; j ++) {dots += "; dots.length (); // MessageProperties 2. 메시지 지속성 채널을 설정하십시오 .BasicPublish ( "", queue_name, messageProperties.Persistent_Text_plain, message.getBytes (); system.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.out.println ( "[x]" + message + " '');} // clos and racple and racplece. channel.close (); connection.close ();}}work.java
import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.queueingconsumer; public class Work {// queue private final static string queue_name = "problic void (strow)"; java.io.ioexception, java.lang.terruptedexception {// 다른 작업자 프로세스의 출력을 차별화 int hashcode = work.class.hashcode (); hashcode (// 연결 및 채널 연결 팩토리 = new ConnectionFactory (); factory.sethost ( "localHost"); Connection =/////// // 대기열 부울 내구성 = true; channel.queuedeClare (queue_name, 내구성, 내구성, 거짓, 거짓, null); system.out.println (hashcode + "[*] 메시지 대기 대기. ctrl + c"를 종료하려면 서비스에 대한 최대 전달 된 메시지 int precetchcount = 1; Channel.BasicQOS (PrefetchCount); 대기열 소비자 소비자 = 새로운 대기열 소비자 (채널); // 소비 대기열 Boolean ACK = False; // 응답 메커니즘 채널을 켜십시오 .BasicConsume (Queue_name, ACK, 소비자); while (true) {queueingconsumer.delivery delivery = consumer.nextDelivery (); String Message = new String (delivery.getbody ()); System.out.println (Hashcode + "[x] 수신 '" + 메시지 + ""); Dowork (메시지); System.out.println (Hashcode + "[x] done "); // channel.basicack (delivery.getenvelope (). getDeliveryTag (), false); channel.basicack (velivery.getenvelope (). getDeliveryTag (), false;}}/** * 각 포인트는 1s * * @param task * @Throws Interrupted void void void void void void void void void void void void void void incrutection을 취합니다. : task.toChararray ()) {if (ch == '.') thread.sleep (1000);}}}요약
위의 내용은이 기사에서 Java Work 큐 코드에 대한 자세한 설명입니다. 모든 사람에게 도움이되기를 바랍니다. 단점이 있으면 메시지를 남겨 두십시오. 이 사이트를 지원해 주신 친구들에게 감사드립니다!