유명한 PHP-Amqplib 주변의 우아한 포장지. 90% 사용 케이스.
설치
AMQP 에이전트 정보
API
선적 서류 비치
구성
예
모래밭
특허
changelog
지금 AMQP 에이전트를 사용해보십시오.
composer require marwanalsoltany/amqp-agent composer.json 에이 구성을 복사하십시오.
"repositories" : {
"amqp-agent-repo" : {
"type" : " vcs " ,
"url" : " https://github.com/MarwanAlsoltany/amqp-agent.git "
}
},
"require" : {
"marwanalsoltany/amqp-agent" : " dev-dev "
},
"minimum-stability" : " dev "달리다:
composer update 참고 : AMQP 에이전트는 기본적으로 버전 v1.1.1에서 기본적으로 PHP 7.1을 지원합니다. 이전 버전에서 php7.1-compatibility 브랜치를 사용한 경우 Composer.json을 업데이트하십시오!
AMQP 에이전트는 PHP 프로젝트에서 메시지 중개인의 구현을 단순화하려고합니다. RabbitMQ 서버 ( PHP-Amqplib )와 대화하기 위해 필요한 객체를 구축하고 구성하거나 클래스를 작성하는 전체 오버 헤드를 제거하고 거의 모든 프로젝트에 적합한 테스트, 완전 구성 가능하며 유연한 API를 노출시킵니다.
PHP-AMQPLIB 라이브러리는 훌륭하고 매우 잘 작동합니다. 한 가지 유일한 문제는 프로젝트에서 사용되는 것이 매우 맨발이며 자신의 래퍼 클래스를 다시 만들지 않고 스파게티 코드를 작성하지 않는 것은 거의 불가능합니다. 또한 함께 제공되는 엄청난 양의 함수, 메소드 및 구성 (매개 변수)을 사용하면 사용할 합리적인 API를 구현하기가 어렵습니다. AMQP 에이전트는 근로자에 대한 통제력을 잃지 않고 가능한 한 많은 추상화를 수행하고 메시지 브로커, 출판사 및 소비자와 관련된 용어를 다시 가져와 새로운 이민자라면 다루어야 할 모든 것입니다.
이 모토에 따르면 AMQP 에이전트는 RabbitMQ와 함께 영리하게 구현 된 유창한 인터페이스를 노출시키고 현대적인 PHP 개발에 적합하며 작업하기에 매우 간단하고 사용하기가 매우 간단합니다. 그러나 매우 강력하고 노동자와 함께 일하는 시점에서 가장 작은 단점을 덮어 쓸 수 있습니다. AMQP 에이전트를 사용하면 몇 줄의 코드로 메시지를 게시하고 소비 할 수 있습니다!
AMQP 에이전트는 PHP-Amqplib 의 어떤 것도 덮어 쓰지 않으며 그 기능과 관련된 용어를 변경합니다. 그것은 단순히 그것을 단순화합니다. 기능의 이름의 소음을 꺼내 일부 장소에서 확장합니다. 또한 근로자 명령, 동적 채널 웨이팅 및 촉진 방법과 같은 멋진 기능을 추가합니다.
AMQP 에이전트는 또한 IoT 프로젝트에 강력한 이벤트 기반 RPC 클라이언트 및 RPC 서버를 제공합니다.
AMQP 에이전트와의 작업은 다음과 같이 쉽습니다.
// Publisher
$ publisher = new Publisher ();
$ publisher -> work ( $ messages );
// Consumer
$ consumer = new Consumer ();
$ consumer -> work ( $ callback );
// RPC Client
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( $ request );
$ rpcClient -> disconnect ();
// RPC Server
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( $ callback );
$ rpcServer -> disconnect ();AMQP 에이전트는 직접 사용할 수있는 여러 콘크리트 클래스와 확장 할 수있는 기타 추상 클래스를 노출시킵니다. 이 두 클래스 분위기에는 또한 도우미 하위 분할이 있습니다.
| 수업 | 설명 | API |
|---|---|---|
AbstractWorker *A | 작업자의 기본 기능을 구현하는 추상 클래스. | 의사 |
출판사 *C*S | 출판 전문 수업. 게시자에게 필요한 방법 만 구현합니다. | 의사 |
소비자 *C*S | 소비를 전문으로하는 수업. 소비자에게 필요한 방법 만 구현합니다. | 의사 |
AbstractEndpoint *A | 엔드 포인트의 기본 기능을 구현하는 추상 클래스. | 의사 |
ClientEndpoint *C | 요청에 특화된 수업. 클라이언트에 필요한 방법 만 구현합니다. | 의사 |
ServerEndpoint *C | 응답을 전문으로하는 클래스. 서버에 필요한 메소드 만 구현합니다. | 의사 |
amqpagentParameters *C*H | 상수로 모든 AMQP 에이전트 매개 변수를 포함하는 클래스. | 의사 |
유틸리티 *C*H | 기타 도우미 기능을 포함하는 클래스. | 의사 |
이벤트 *C*H | 이벤트 처리를위한 간단한 수업 (파견 및 청취). | 의사 |
ArrayProxy *C*H | 조작 및 작업 어레이를위한 방법을 포함하는 클래스. | 의사 |
classproxy *C*H | 대리 방법 호출, 속성 조작 및 클래스 유틸리티를위한 방법이 포함 된 클래스. | 의사 |
Idgenerator *C*H | 고유 ID 및 토큰을 생성하기위한 함수가 포함 된 클래스 | 의사 |
시리얼 라이저 *C*H | 근로자와 함께 사용되는 유연한 세력화 제. | 의사 |
로거 *C*H | 정적으로 그리고 인스턴스화하는 방법을 노출시키는 로그를 작성하는 클래스. | 의사 |
싱글 톤 *A*H | 싱글 톤의 기본 기능을 구현하는 추상 클래스. | 의사 |
구성 *C*R | 구성 파일을 객체로 바꾸는 클래스. | 의사 |
클라이언트 *C*R | 클래스는 AMQP 에이전트가 제공하는 모든 것을 반환합니다. 말할 간단한 서비스 컨테이너. | 의사 |
예 *A*H | 소비자의 기본 콜백으로 사용되는 추상 클래스. | 의사 |
또한 : AbstractWorkersingleton, PublisherSingleton, ConsumerSingleton, AbstractWorkerInterface, PublisherInterface, ConsumerInterface, WorkerFacilitationInterface, WorkerMutationTrait, WorkerCommanDtrait, AbstractEndpointInterface, ClientEndPointInterface, ServerEndPointInterface, EventRayProxyTrait, ClassProxyTrait, AbstractParameters.
*C 콘크리트 : 이 클래스는 구체적인 클래스이며 직접 인스턴스화 할 수 있습니다.*A 초록 : 이 수업은 추상적 인 수업이며 직접 인스턴스화 할 수 없습니다.*H 헬퍼 : 이 수업은 도우미 수업입니다. 타사 대안은 대신 자유롭게 사용할 수 있습니다.*R 권장 : 이 클래스는 AMQP 에이전트 (모범 사례)와 함께 작업 할 때 사용하는 것이 좋습니다.*S Singleton : 이 클래스에는 Singleton 으로 클래스 이름을 접미사하여 사용할 수있는 싱글 톤 버전이 있으며 *Singleton::getInstance() , IE Publisher > PublisherSingleton 통해 검색 할 수 있습니다.참고 : 싱글 톤은 방지 방지로 간주되며, 사용 케이스가 있지만 가능한 한 많이 피하십시오. 당신이 무엇을하고 있는지 알고있는 경우에만 싱글 톤을 사용하십시오.
메시지를 신속하게 게시하고 소비하려면 모든 것이 이미 준비되고 구성되어 있다면 AMQP 에이전트에는 모범 사례를 따르는 테스트 구성이 있습니다. 파일에서 Publisher 클래스 및/또는 Consumer 클래스를 가져 와서 나중에 인스턴스의 나중에 원하는 매개 변수 (예 : RabbitMQ Credentials)를 덮어 쓸 수 있습니다.
정확한 요구에 맞게 AMQP 에이전트 구성을 미세 조정하고 조정하려면 약간의 작업이 있습니다. 구성 파일을 제공해야합니다 (MAKS-AMQP-AGENT-CONFIG.PHP 참조 및 주석에주의를 기울이십시오). 모든 것을 공급할 필요는 없으며, 덮어 쓰고 자하는 매개 변수 만 쓸 수 있습니다. AMQP 에이전트는 부족을 추가 할 수있을만큼 똑똑합니다. 이러한 매개 변수는 나중에 공개 할당 표기법 또는 메소드 호출을 통해 덮어 쓸 수도 있습니다.
사실 : AMQP 에이전트는 구성 파일과 메소드 호출에 전달 된 매개 변수 배열에서 동일한 매개 변수 이름을 php-amqplib와 사용합니다.
<?php return [
// Global
' connectionOptions ' => [
' host ' => ' your-rabbitmq-server.com ' ,
' port ' => 5672 ,
' user ' => ' your-username ' ,
' password ' => ' your-password ' ,
' vhost ' => ' / '
],
' queueOptions ' => [
' queue ' => ' your.queue.name ' ,
' durable ' => true ,
' nowait ' => false
],
// Publisher
' exchangeOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' type ' => ' direct '
],
' bindOptions ' => [
' queue ' => ' your.queue.name ' ,
' exchange ' => ' your.exchange.name '
],
' messageOptions ' => [
' properties ' => [
' content_type ' => ' application/json ' ,
' content_encoding ' => ' UTF-8 ' ,
' delivery_mode ' => 2
]
],
' publishOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' routing_key ' => ' your.route.name '
],
// Consumer
' qosOptions ' => [
' prefetch_count ' => 25
],
' waitOptions ' => [
' timeout ' => 3600
],
' consumeOptions ' => [
' queue ' => ' your.queue.name ' ,
' consumer_tag ' => ' your.consumer.name ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback '
]
// RPC Endpoints
'rpcQueueName' => 'your.rpc.queue.name'
]; 참고 : 배열 1 단계 키 이름 ( Options 으로 접미사)은 AMQP 에이전트에 특화되어 있습니다.
예제로 시작하기 전에 몇 가지를 명확히해야합니다. AMQP 에이전트를 사용하면 작업자를 검색하는 방법에 대한 여러 가지 방법이 있으며 간단한 방법, 권장 방법 및 고급 방식이 있습니다. 작업자를 회수 한 후에는 점토와 같습니다. 원하는 방식으로 형성 할 수 있습니다. 이 모듈 식 디자인은 귀하의 요구를 우아하게 수용하고 확장 가능한 코드베이스로 이동하며 단순히 모든 사람을 행복하게 만듭니다.
new 키워드를 사용하여 작업자를 직접 인스턴스화하는 것입니다. 이러한 방식으로 생성자, 메소드 호출 또는 공공 재산 할당을 통해 매개 변수를 전달해야합니다.PublisherSingleton::getInstance() 검색하는 것입니다. 이러한 방식으로 getInstance() 메소드, 메소드 호출 또는 공개 속성 할당을 통해 매개 변수를 전달해야합니다.Client 클래스의 인스턴스를 사용하는 것입니다. 이 방법으로는 전달 된 구성에서 매개 변수가 검색되므로 코드를 더 읽기 쉽게 만듭니다. // Instantiating Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Worker PublisherSingleton ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Worker ConsumerSingleton ;
use MAKS AmqpAgent RPC ClientEndpoint ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ publisher1 = new Publisher ( /* parameters can be passed here */ );
$ publisher2 = PublisherSingleton:: getInstance ( /* parameters can be passed here */ );
$ consumer1 = new Consumer ( /* parameters can be passed here */ );
$ consumer2 = ConsumerSingleton:: getInstance ( /* parameters can be passed here */ );
$ rpcClientA = new ClientEndpoint ( /* parameters can be passed here */ );
$ rpcServerA = new ServerEndpoint ( /* parameters can be passed here */ );
// the parameters from this Config object will be passed to the workers.
$ config = new Config ( ' path/to/your/config-file.php ' );
$ client = new Client ( $ config ); // path can also be passed directly to Client
$ publisher3 = $ client -> getPublisher (); // or $client->get('publisher');
$ consumer3 = $ client -> getConsumer (); // or $client->get('consumer');
$ rpcClientB = $ client -> getClientEndpoint (); // or $client->get('client.endpoint');
$ rpcServerB = $ client -> getServerEndpoint (); // or $client->get('server.endpoint');
// Use $client->gettable() to get an array of all available services. // Publisher Demo 1
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// exchangeOptions
' exchange ' => ' test.messages.exchange '
],
[
// bindOptions
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
],
[
// messageOptions
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ publisher -> connect ()-> queue ()-> exchange ()-> bind ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (2)
$ publisher -> prepare ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (3)
$ publisher -> work ( $ messages ); // Publisher Demo 2
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ publisher -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ publisher -> connect ();
$ publisher -> queue ([
' queue ' => ' test.messages.queue '
]);
$ publisher -> exchange ([
' exchange ' => ' test.messages.exchange '
]);
$ publisher -> bind ([
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
]);
foreach ( $ messages as $ message ) {
$ publisher -> publish (
[
' body ' => $ message ,
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
' exchange ' => ' test.messages.exchange '
]
);
}
$ publisher -> disconnect (); // Consumer Demo 1
$ consumer = new Consumer (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// qosOptions
' exchange ' => ' test.messages.exchange '
],
[
// waitOptions
],
[
// consumeOptions
' queue ' => ' test.messages.queue ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback ' ,
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ consumer -> connect ();
$ consumer -> queue ();
$ consumer -> qos ();
$ consumer -> consume ();
$ consumer -> wait ();
$ consumer -> disconnect ();
// Variant I (2)
$ consumer -> prepare ()-> consume ()-> wait ()-> disconnect ();
// Variant I (3)
$ consumer -> work ( ' YourNamespaceYourClass::yourCallback ' ); // Consumer Demo 2
$ variable = ' This variable is needed in your callback. It will be the second, the first is always the message! ' ;
$ consumer = new Consumer ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ consumer -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ consumer -> connect ();
$ consumer -> queue ([
' queue ' => ' test.messages.queue '
]);
$ consumer -> qos ([
' prefetch_count ' => 10
]);
$ consumer -> consume (
[
' YourNamespaceYourClass ' ,
' yourCallback '
],
[
$ variable
],
[
' queue ' => ' test.messages.queue '
]
);
$ consumer -> wait ();
$ consumer -> disconnect (); // RPC Client Demo 1
$ rpcClient = new ClientEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // RPC Client Demo 2
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ response = $ rpcClient -> request (
' {"command":"some-command","parameter":"some-parameter"} ' ,
' your.rpc.queue.name '
);
$ rpcClient -> disconnect (); // RPC Server Demo 1
$ rpcServer = new ServerEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect (); // RPC Server Demo 2
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ request = $ rpcServer -> respond (
' YourNamespaceYourClass::yourCallback ' ,
' your.rpc.queue.name '
);
$ rpcServer -> disconnect ();사실 : 매개 변수를 공급할 때 필요한 매개 변수 만 제공합니다. AMQP 에이전트는 결함을 첨부하기에 충분히 똑똑합니다.
조언 : 원하는 매개 변수와 함께 구성 파일을 제공 한 후 Client 클래스 인스턴스에서 get($className) 사용하는 경우 위의 예제에 작성된 무거운 생성자를 단순화 할 수 있습니다.
참고 : 방법에 대한 전체 설명은 AMQP 에이전트 문서를 참조하십시오. 매개 변수에 대한 전체 설명은 RabbitMQ 문서 및 Php-Amqplib를 참조하십시오.
이 예에서는 실제 시나리오에서 AMQP 에이전트와 어떻게 작동하는지 알 수 있습니다.
// Advanced Publisher Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Helper Serializer ;
// Preparing some data to work with.
$ data = [];
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ data [] = [
' id ' => $ i ,
' importance ' => $ i % 3 == 0 ? ' high ' : ' low ' , // Tag 1/3 of the messages with high importance.
' text ' => ' Test message with ID ' . $ i
];
}
// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$ config = new Config ();
// Instantiating a client.
$ client = new Client ( $ config );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a publisher from the client.
/** @var MAKSAmqpAgentWorkerPublisher */
$ publisher = $ client -> get ( ' publisher ' );
// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$ publisher -> connect ();
// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$ publisher -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ publisher -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Declaring a direct exchange to publish messages to.
$ publisher -> exchange ([
' exchange ' => ' high.and.low.importance.exchange ' ,
' type ' => ' direct '
]);
// Binding the queue with the exchange.
$ publisher -> bind ([
' queue ' => ' high.and.low.importance.queue ' ,
' exchange ' => ' high.and.low.importance.exchange '
]);
// Publishing messages according to their priority.
foreach ( $ data as $ item ) {
$ payload = $ serializer -> serialize ( $ item , ' JSON ' );
if ( $ item [ ' importance ' ] == ' high ' ) {
$ publisher -> publish (
[
' body ' => $ payload ,
' properties ' => [
' priority ' => 2
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
continue ;
}
$ publisher -> publish (
$ payload , // Not providing priority will fall back to 0
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
}
// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' start ' , ' consumer ' ),
' JSON '
),
' properties ' => [
' priority ' => 1
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$ iterator = 2 ;
do {
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' close ' , ' channel ' ),
' JSON '
),
' properties ' => [
' priority ' => 0
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
$ iterator --;
} while ( $ iterator != 0 );
// Close the connection with RabbitMQ server.
$ publisher -> disconnect (); // Advanced Consumer Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Helper Serializer ;
use MAKS AmqpAgent Helper Logger ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var MAKSAmqpAgentHelperLogger */
$ logger = $ client -> get ( ' logger ' );
$ logger -> setDirectory ( __DIR__ );
$ logger -> setFilename ( ' high-and-low-importance-messages ' );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a consumer from the client.
/** @var MAKSAmqpAgentWorkerConsumer */
$ consumer = $ client -> get ( ' consumer ' );
$ consumer -> connect ();
// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$ consumer -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Overwriting the default quality of service.
$ consumer -> qos ([
' prefetch_count ' => 1 ,
]);
// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$ callback = function ( $ message , & $ client , $ callback ) {
$ data = $ client -> getSerializer ()-> unserialize ( $ message -> body , ' JSON ' );
if (Consumer:: isCommand ( $ data )) {
Consumer:: ack ( $ message );
if (Consumer:: hasCommand ( $ data , ' close ' , ' channel ' )) {
// Giving time for acknowledgements to take effect,
// because the channel will be closed shortly
sleep ( 5 );
// Close the channel using the delivery info of the message.
Consumer:: shutdown ( $ message );
} elseif (Consumer:: hasCommand ( $ data , ' start ' , ' consumer ' )) {
$ consumer = $ client -> getConsumer ();
// Getting a new channel on the same connection.
$ channel = $ consumer -> getNewChannel ();
$ consumer -> queue (
[
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
],
$ channel
);
$ consumer -> qos (
[
' prefetch_count ' => 1 ,
],
$ channel
);
$ consumer -> consume (
$ callback ,
[
& $ client ,
$ callback
],
[
' queue ' => ' high.and.low.importance.queue ' ,
' consumer_tag ' => ' callback.consumer- ' . uniqid ()
],
$ channel
);
}
return ;
}
$ client -> getLogger ()-> write ( " ( { $ data [ ' importance ' ]} ) - { $ data [ ' text ' ]}" );
// Sleep for 50ms to mimic some processing.
usleep ( 50000 );
// The final step is acknowledgment so that no data is lost.
Consumer:: ack ( $ message );
};
$ consumer -> consume (
$ callback ,
[
& $ client , // Is used to refetch the consumer, serializer, and logger.
$ callback // This gets passed to the consumer that get started by the callback.
],
[
' queue ' => ' high.and.low.importance.queue '
]
);
// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$ consumer -> waitForAll ();
// Close the connection with RabbitMQ server.
$ consumer -> disconnect (); // Advanced RPC Client Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ClientEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC client endpoint from the client.
/** @var MAKSAmqpAgentRPCClientEndpoint */
$ rpcClient = $ client -> getClientEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$ rpcClient
-> on ( ' connection.after.open ' , function ( $ connection , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is now connected! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ connection instanceof AMQPStreamConnection) {
printf ( ' The connection has currently %d channel(s). ' , count ( $ connection -> channels ) - 1 );
}
})-> on ( ' request.before.send ' , function ( $ request , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is about to send a request! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ request instanceof AMQPMessage) {
$ request -> set ( ' content_type ' , ' application/json ' )
printf ( ' The request content_type header has been set to: %s ' , $ request -> get ( ' content_type ' ));
}
});
// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$ roundtrip = $ rpcClient -> ping ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // Advanced RPC Server Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC server from the client.
/** @var MAKSAmqpAgentRPCServerEndpoint */
$ rpcServer = $ client -> getServerEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$ rpcServer
-> on ( ' request.on.get ' , function ( $ request , $ rpcServer , $ eventName ) {
printf ( ' %s has emitted [%s] event and has just got a request! ' , get_class ( $ rpcServer ), $ eventName );
if ( $ request instanceof AMQPMessage) {
printf ( ' The request has the following body: %s ' , $ request -> body ;
}
});
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect ();사실 : 구성 파일에서 모든 매개 변수의 변경을 작성하고 기본값 대신 클라이언트에게 전달하면 게시자/소비자 고급 예제에서 코드를 더 쉽게 만들 수 있습니다.
조언 : AMQP 에이전트 코드베이스가 잘 문서화되어 있습니다.이 링크를 참조하여 모든 클래스와 방법을 살펴보십시오.
AMQP 에이전트는 PHP-AMQPLIB 라이센스로 인해 GNU LGPL v2.1 에 따라 라이센스가 부여 된 오픈 소스 패키지입니다.
저작권 (c) 2020 Marwan al-Soltany. 모든 권리 보유.