90%のユースケースで有名なPHP-AMQPLIBの周りのエレガントなラッパー。
インストール
AMQPエージェントについて
API
ドキュメント
構成
例
リンク
ライセンス
Changelog
今すぐAMQPエージェントを試してみてください:
composer require marwanalsoltany/amqp-agent composer.jsonでこの構成をコピーします:
"repositories" : {
"amqp-agent-repo" : {
"type" : " vcs " ,
"url" : " https://github.com/MarwanAlsoltany/amqp-agent.git "
}
},
"require" : {
"marwanalsoltany/amqp-agent" : " dev-dev "
},
"minimum-stability" : " dev "走る:
composer update注: AMQPエージェントは、バージョンv1.1.1から始まるデフォルトでPHP 7.1をサポートしています。古いバージョンでphp7.1-compatibilityブランチを使用した場合は、Composer.jsonを更新してください!
AMQPエージェントは、PHPプロジェクトでのメッセージブローカーの実装を簡素化しようとします。 RabbitMQサーバー( PHP-AMQPLIBを介して)と通信するために必要なオブジェクトの構築と構成のオーバーヘッド全体を奪い、ほとんどすべてのプロジェクトに適合するテスト済み、完全に構成可能で柔軟なAPIを公開します。
PHP-AMQPLIBライブラリは素晴らしく、非常にうまく機能しています。唯一の問題は、プロジェクトで使用することはかなり裸の骨であり、独自のラッパークラスを作り直さずに、Spaghettiコードを書かないことはほとんど不可能です。さらに、それに付属する膨大な量の機能、メソッド、および構成(パラメーター)により、使用する合理的なAPIを実装することは非常に困難です。 AMQPエージェントは、労働者のコントロールを失うことなくできるだけ多くの抽象化を行うことで、この問題を解決し、メッセージブローカーに関連する用語を取り戻すことで、出版社と消費者があなたが新人である場合に対処する必要があるすべてです。
このモットーによれば、AMQPエージェントは、巧妙に実装され、最新のPHP開発に適合し、仕事ができ、非常に簡単に使用できるような流fluentインターフェイスを露出させることにより、RabbitMQを可能な限り楽しくエレガントに作業することを可能にします。しかし、非常に強力であり、労働者と協力する任意の時点で最小の癖を上書きすることができます。 AMQPエージェントを使用すると、数行のコードでメッセージの公開と消費を開始できます!
AMQPエージェントは、 php-amqplibのものを上書きせず、その機能に関連する用語を変更しません。それは単純化するだけです。関数の名前のノイズを取り出し、一部の場所に拡張します。また、ワーカーコマンド、ダイナミックチャネル待機、円滑化の方法などの素晴らしい機能も追加されています。
AMQPエージェントは、IoTプロジェクトに強力なイベントベースのRPCクライアントとRPCサーバーも提供しています。
AMQPエージェントとの作業は、次のように簡単になります。
// Publisher
$ publisher = new Publisher ();
$ publisher -> work ( $ messages );
// Consumer
$ consumer = new Consumer ();
$ consumer -> work ( $ callback );
// RPC Client
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( $ request );
$ rpcClient -> disconnect ();
// RPC Server
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( $ callback );
$ rpcServer -> disconnect ();AMQPエージェントは、直接使用できる多くのコンクリートクラスと、拡張できる他の抽象クラスを公開します。これらの2人のクラスバリエントには、ヘルパーサブディビジョンもあります。
| クラス | 説明 | API |
|---|---|---|
要約ワーカー*A | 労働者の基本機能を実装する抽象クラス。 | doc |
出版社*C*S | 出版に特化したクラス。出版社に必要な方法のみを実装します。 | doc |
消費者*C*S | 消費に特化したクラス。消費者に必要な方法のみを実装します。 | doc |
AbstractEndPoint *A | エンドポイントの基本機能を実装する抽象クラス。 | doc |
ClientEndPoint *C | リクエストに特化したクラス。クライアントに必要な方法のみを実装します。 | doc |
ServerEndPoint *C | 応答に特化したクラス。サーバーに必要なメソッドのみを実装します。 | doc |
amqpagentparameters *C*H | 定数としてすべてのAMQPエージェントパラメーターを含むクラス。 | doc |
ユーティリティ*C*H | その他のヘルパー機能を含むクラス。 | doc |
イベント*C*H | イベントを処理するためのシンプルなクラス(派遣とリスニング)。 | doc |
arrayproxy *C*H | 操作および作業アレイのための方法を含むクラス。 | doc |
ClassProxy *C*H | 呼び出し、プロパティの操作、およびクラスユーティリティの代理方法の方法を含むクラス。 | doc |
idgenerator *C*H | 一意のIDとトークンを生成するための機能を含むクラス | doc |
Serializer *C*H | 労働者と組み合わせて使用する柔軟なシリアナー。 | doc |
ロガー*C*H | ログを作成するクラス、静的およびインスタンス化時に動作する方法を公開します。 | doc |
シングルトン*A*H | シングルトンの基本的な機能を実装する抽象クラス。 | doc |
config *C*R | 構成ファイルをオブジェクトに変えるクラス。 | doc |
クライアント*C*R | クラスは、AMQPエージェントが提供するすべてのものを返します。言うようにシンプルなサービスコンテナ。 | doc |
例*A*H | 消費者のデフォルトのコールバックとして使用される抽象クラス。 | doc |
参照:AbstractWorkerSingleton、Publishersingleton、Consumersingleton、AbstractWorkerInterface、PublisherInterface、ConsumerInterface、WorkerFacilitationInterface、WorkerMatutionTrait、WorkErcommandTrat、AbstractEndPointInterface、ClientEndpointインターフェイス、ServerEndPointInterface、EventTrait、ArrayProxyTrait、ClassProxytrait、Arsproxytrait、ArrayProxytrait。
*Cコンクリート:このクラスは具体的なクラスであり、直接インスタンス化できます。*A要約:このクラスは抽象クラスであり、直接インスタンス化することはできません。*Hヘルパー:このクラスはヘルパークラスです。代わりに、サードパーティの代替品を自由に使用できます。*Rが推奨:このクラスは、AMQPエージェント(ベストプラクティス)で作業するときに使用することをお勧めします。*S Singleton:このクラスには、Singletonのクラス名を接尾辞で介してSingletonバージョンを使用でき、 *Singleton::getInstance() 、IE Publisher > PublisherSingletonを介して取得できます。注:シングルトンはアンチパターンと見なされます。可能な限り避けてみてください。あなたが何をしているのかを知っている場合にのみ、シングルトンを使用してください。
すぐにメッセージを公開して消費したい場合、すべてが準備ができて構成されている場合、AMQPエージェントには、ベストプラクティスに続くテスト済みの構成が付属しています。ファイルにPublisherクラスやConsumerクラスをインポートして、必要なパラメーター(たとえば、RabbitMQ資格情報)をインスタンスの後で上書きするだけです。
AMQPエージェントの構成を正確なニーズに合わせて微調整して調整したい場合は、やるべきことが少しあります。構成ファイルを提供する必要があります(maks-amqp-agent-config.phpを参照して、コメントに注意してください)。すべてを提供する必要はありません。上書きするパラメーターのみを書き込むことができます。AMQPエージェントは、欠陥を追加するのに十分賢いです。これらのパラメーターは、パブリック割り当て表記またはメソッド通話ごとに後で上書きすることもできます。
事実: AMQPエージェントは、構成ファイルとメソッド呼び出しで渡されたパラメーター配列でPHP-AMQPLIBと同じパラメーター名を使用します。
<?php return [
// Global
' connectionOptions ' => [
' host ' => ' your-rabbitmq-server.com ' ,
' port ' => 5672 ,
' user ' => ' your-username ' ,
' password ' => ' your-password ' ,
' vhost ' => ' / '
],
' queueOptions ' => [
' queue ' => ' your.queue.name ' ,
' durable ' => true ,
' nowait ' => false
],
// Publisher
' exchangeOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' type ' => ' direct '
],
' bindOptions ' => [
' queue ' => ' your.queue.name ' ,
' exchange ' => ' your.exchange.name '
],
' messageOptions ' => [
' properties ' => [
' content_type ' => ' application/json ' ,
' content_encoding ' => ' UTF-8 ' ,
' delivery_mode ' => 2
]
],
' publishOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' routing_key ' => ' your.route.name '
],
// Consumer
' qosOptions ' => [
' prefetch_count ' => 25
],
' waitOptions ' => [
' timeout ' => 3600
],
' consumeOptions ' => [
' queue ' => ' your.queue.name ' ,
' consumer_tag ' => ' your.consumer.name ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback '
]
// RPC Endpoints
'rpcQueueName' => 'your.rpc.queue.name'
];注:アレイの最初のレベルのキー名( Optionsが付いた接尾辞)は、AMQPエージェントに固有です。
例から始める前に、いくつかのことを明確にする必要があります。最初から、AMQPエージェントには、労働者を取得する方法、簡単な方法、推奨方法、より高度な方法があることに複数の方法があることに言及する価値があります。労働者を回収した後、それは粘土のようなもので、あなたが望むようにそれを形成することができます。このモジュラーデザインは、あなたのニーズに優雅に対応し、スケーラブルなコードベースに駆り立て、誰もが幸せになります。
newキーワードを使用しています。この方法では、コンストラクター、メソッド呼び出し、または公共財産の割り当てを介してパラメーターを渡す必要があります。PublisherSingleton::getInstance()を取得することです。この方法では、 getInstance()メソッド、メソッド呼び出し、または公共財産の割り当てを介してパラメーターを渡す必要があります。Clientクラスのインスタンスを使用することです。この方法では、パラメーターが渡された構成から取得されるため、コードがより読みやすくなります。 // Instantiating Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Worker PublisherSingleton ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Worker ConsumerSingleton ;
use MAKS AmqpAgent RPC ClientEndpoint ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ publisher1 = new Publisher ( /* parameters can be passed here */ );
$ publisher2 = PublisherSingleton:: getInstance ( /* parameters can be passed here */ );
$ consumer1 = new Consumer ( /* parameters can be passed here */ );
$ consumer2 = ConsumerSingleton:: getInstance ( /* parameters can be passed here */ );
$ rpcClientA = new ClientEndpoint ( /* parameters can be passed here */ );
$ rpcServerA = new ServerEndpoint ( /* parameters can be passed here */ );
// the parameters from this Config object will be passed to the workers.
$ config = new Config ( ' path/to/your/config-file.php ' );
$ client = new Client ( $ config ); // path can also be passed directly to Client
$ publisher3 = $ client -> getPublisher (); // or $client->get('publisher');
$ consumer3 = $ client -> getConsumer (); // or $client->get('consumer');
$ rpcClientB = $ client -> getClientEndpoint (); // or $client->get('client.endpoint');
$ rpcServerB = $ client -> getServerEndpoint (); // or $client->get('server.endpoint');
// Use $client->gettable() to get an array of all available services. // Publisher Demo 1
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// exchangeOptions
' exchange ' => ' test.messages.exchange '
],
[
// bindOptions
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
],
[
// messageOptions
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ publisher -> connect ()-> queue ()-> exchange ()-> bind ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (2)
$ publisher -> prepare ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (3)
$ publisher -> work ( $ messages ); // Publisher Demo 2
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ publisher -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ publisher -> connect ();
$ publisher -> queue ([
' queue ' => ' test.messages.queue '
]);
$ publisher -> exchange ([
' exchange ' => ' test.messages.exchange '
]);
$ publisher -> bind ([
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
]);
foreach ( $ messages as $ message ) {
$ publisher -> publish (
[
' body ' => $ message ,
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
' exchange ' => ' test.messages.exchange '
]
);
}
$ publisher -> disconnect (); // Consumer Demo 1
$ consumer = new Consumer (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// qosOptions
' exchange ' => ' test.messages.exchange '
],
[
// waitOptions
],
[
// consumeOptions
' queue ' => ' test.messages.queue ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback ' ,
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ consumer -> connect ();
$ consumer -> queue ();
$ consumer -> qos ();
$ consumer -> consume ();
$ consumer -> wait ();
$ consumer -> disconnect ();
// Variant I (2)
$ consumer -> prepare ()-> consume ()-> wait ()-> disconnect ();
// Variant I (3)
$ consumer -> work ( ' YourNamespaceYourClass::yourCallback ' ); // Consumer Demo 2
$ variable = ' This variable is needed in your callback. It will be the second, the first is always the message! ' ;
$ consumer = new Consumer ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ consumer -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ consumer -> connect ();
$ consumer -> queue ([
' queue ' => ' test.messages.queue '
]);
$ consumer -> qos ([
' prefetch_count ' => 10
]);
$ consumer -> consume (
[
' YourNamespaceYourClass ' ,
' yourCallback '
],
[
$ variable
],
[
' queue ' => ' test.messages.queue '
]
);
$ consumer -> wait ();
$ consumer -> disconnect (); // RPC Client Demo 1
$ rpcClient = new ClientEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // RPC Client Demo 2
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ response = $ rpcClient -> request (
' {"command":"some-command","parameter":"some-parameter"} ' ,
' your.rpc.queue.name '
);
$ rpcClient -> disconnect (); // RPC Server Demo 1
$ rpcServer = new ServerEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect (); // RPC Server Demo 2
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ request = $ rpcServer -> respond (
' YourNamespaceYourClass::yourCallback ' ,
' your.rpc.queue.name '
);
$ rpcServer -> disconnect ();事実:パラメーターを提供する場合、必要なパラメーターのみを提供します。 AMQPエージェントは、不足を追加するのに十分賢いです。
アドバイス:上記の例に記述された重いコンストラクターを簡素化できます。 Clientクラスのインスタンスで、必要なパラメーターを提供した後、クライアントクラスのインスタンスでget($className)を使用できます。
注:メソッドの完全な説明については、AMQPエージェントドキュメントを参照してください。パラメーターの完全な説明については、RabbitMQドキュメントとPHP-AMQPLIBを参照してください。
これらの例では、実際のシナリオでAMQPエージェントとどのように動作するかがわかります。
// Advanced Publisher Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Helper Serializer ;
// Preparing some data to work with.
$ data = [];
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ data [] = [
' id ' => $ i ,
' importance ' => $ i % 3 == 0 ? ' high ' : ' low ' , // Tag 1/3 of the messages with high importance.
' text ' => ' Test message with ID ' . $ i
];
}
// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$ config = new Config ();
// Instantiating a client.
$ client = new Client ( $ config );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a publisher from the client.
/** @var MAKSAmqpAgentWorkerPublisher */
$ publisher = $ client -> get ( ' publisher ' );
// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$ publisher -> connect ();
// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$ publisher -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ publisher -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Declaring a direct exchange to publish messages to.
$ publisher -> exchange ([
' exchange ' => ' high.and.low.importance.exchange ' ,
' type ' => ' direct '
]);
// Binding the queue with the exchange.
$ publisher -> bind ([
' queue ' => ' high.and.low.importance.queue ' ,
' exchange ' => ' high.and.low.importance.exchange '
]);
// Publishing messages according to their priority.
foreach ( $ data as $ item ) {
$ payload = $ serializer -> serialize ( $ item , ' JSON ' );
if ( $ item [ ' importance ' ] == ' high ' ) {
$ publisher -> publish (
[
' body ' => $ payload ,
' properties ' => [
' priority ' => 2
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
continue ;
}
$ publisher -> publish (
$ payload , // Not providing priority will fall back to 0
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
}
// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' start ' , ' consumer ' ),
' JSON '
),
' properties ' => [
' priority ' => 1
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$ iterator = 2 ;
do {
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' close ' , ' channel ' ),
' JSON '
),
' properties ' => [
' priority ' => 0
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
$ iterator --;
} while ( $ iterator != 0 );
// Close the connection with RabbitMQ server.
$ publisher -> disconnect (); // Advanced Consumer Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Helper Serializer ;
use MAKS AmqpAgent Helper Logger ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var MAKSAmqpAgentHelperLogger */
$ logger = $ client -> get ( ' logger ' );
$ logger -> setDirectory ( __DIR__ );
$ logger -> setFilename ( ' high-and-low-importance-messages ' );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a consumer from the client.
/** @var MAKSAmqpAgentWorkerConsumer */
$ consumer = $ client -> get ( ' consumer ' );
$ consumer -> connect ();
// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$ consumer -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Overwriting the default quality of service.
$ consumer -> qos ([
' prefetch_count ' => 1 ,
]);
// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$ callback = function ( $ message , & $ client , $ callback ) {
$ data = $ client -> getSerializer ()-> unserialize ( $ message -> body , ' JSON ' );
if (Consumer:: isCommand ( $ data )) {
Consumer:: ack ( $ message );
if (Consumer:: hasCommand ( $ data , ' close ' , ' channel ' )) {
// Giving time for acknowledgements to take effect,
// because the channel will be closed shortly
sleep ( 5 );
// Close the channel using the delivery info of the message.
Consumer:: shutdown ( $ message );
} elseif (Consumer:: hasCommand ( $ data , ' start ' , ' consumer ' )) {
$ consumer = $ client -> getConsumer ();
// Getting a new channel on the same connection.
$ channel = $ consumer -> getNewChannel ();
$ consumer -> queue (
[
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
],
$ channel
);
$ consumer -> qos (
[
' prefetch_count ' => 1 ,
],
$ channel
);
$ consumer -> consume (
$ callback ,
[
& $ client ,
$ callback
],
[
' queue ' => ' high.and.low.importance.queue ' ,
' consumer_tag ' => ' callback.consumer- ' . uniqid ()
],
$ channel
);
}
return ;
}
$ client -> getLogger ()-> write ( " ( { $ data [ ' importance ' ]} ) - { $ data [ ' text ' ]}" );
// Sleep for 50ms to mimic some processing.
usleep ( 50000 );
// The final step is acknowledgment so that no data is lost.
Consumer:: ack ( $ message );
};
$ consumer -> consume (
$ callback ,
[
& $ client , // Is used to refetch the consumer, serializer, and logger.
$ callback // This gets passed to the consumer that get started by the callback.
],
[
' queue ' => ' high.and.low.importance.queue '
]
);
// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$ consumer -> waitForAll ();
// Close the connection with RabbitMQ server.
$ consumer -> disconnect (); // Advanced RPC Client Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ClientEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC client endpoint from the client.
/** @var MAKSAmqpAgentRPCClientEndpoint */
$ rpcClient = $ client -> getClientEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$ rpcClient
-> on ( ' connection.after.open ' , function ( $ connection , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is now connected! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ connection instanceof AMQPStreamConnection) {
printf ( ' The connection has currently %d channel(s). ' , count ( $ connection -> channels ) - 1 );
}
})-> on ( ' request.before.send ' , function ( $ request , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is about to send a request! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ request instanceof AMQPMessage) {
$ request -> set ( ' content_type ' , ' application/json ' )
printf ( ' The request content_type header has been set to: %s ' , $ request -> get ( ' content_type ' ));
}
});
// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$ roundtrip = $ rpcClient -> ping ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // Advanced RPC Server Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC server from the client.
/** @var MAKSAmqpAgentRPCServerEndpoint */
$ rpcServer = $ client -> getServerEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$ rpcServer
-> on ( ' request.on.get ' , function ( $ request , $ rpcServer , $ eventName ) {
printf ( ' %s has emitted [%s] event and has just got a request! ' , get_class ( $ rpcServer ), $ eventName );
if ( $ request instanceof AMQPMessage) {
printf ( ' The request has the following body: %s ' , $ request -> body ;
}
});
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect ();事実:設定ファイルにすべてのパラメーターの変更を行い、デフォルトではなくクライアントに渡すと、パブリッシャー/コンシューマーの高度な例のコードをより簡単にすることができます。
アドバイス: AMQPエージェントコードベースは十分に文書化されています。このリンクを参照して、すべてのクラスと方法を調べてください。
AMQPエージェントは、PHP-AMQPLIBライセンスにより、 GNU LGPL v2.1に基づいてライセンスされたオープンソースのパッケージです。
著作権(c)2020 Marwan al-Soltany。無断転載を禁じます。