غلاف أنيق حول PHP-AMQPLIB الشهير لحالة الاستخدام 90 ٪.
تثبيت
حول وكيل AMQP
API
الوثائق
إعدادات
أمثلة
الروابط
رخصة
Changelog
جرب وكيل AMQP الآن:
composer require marwanalsoltany/amqp-agent انسخ هذا التكوين في composer.json :
"repositories" : {
"amqp-agent-repo" : {
"type" : " vcs " ,
"url" : " https://github.com/MarwanAlsoltany/amqp-agent.git "
}
},
"require" : {
"marwanalsoltany/amqp-agent" : " dev-dev "
},
"minimum-stability" : " dev "يجري:
composer update ملاحظة: يدعم AMQP Agent الآن PHP 7.1 افتراضيًا بدءًا من الإصدار v1.1.1 ، إذا استخدمت فرع php7.1-compatibility في الإصدارات القديمة ، قم بتحديث الملحن الخاص بك!
يحاول Amqp Agent تبسيط تنفيذ وسيط الرسائل في مشروع PHP. إنه يسلب النفقات العامة الكاملة لبناء وتكوين الكائنات أو إنشاء فئات ستحتاجها من أجل التحدث مع RabbitMQ Server (من خلال PHP-AMQPlib ) وتكشف واجهة برمجة تطبيقات مختبرة وقابلة للتكوين بالكامل ومرنة تناسب أي مشروع تقريبًا.
مكتبة PHP-AMQPLIB رائعة وتعمل بشكل جيد للغاية. المشكلة الوحيدة والوحيدة هي ، إنها عارية إلى حد ما لاستخدامها في المشروع ، دون إعادة تشكيل فصول الغلاف الخاصة بك ، يكاد يكون من المستحيل عدم كتابة رمز السباغيتي. بالإضافة إلى الكمية الهائلة من الوظائف والأساليب والتكوينات (المعلمات) التي تأتي معها تجعل من الصعب تطبيق واجهة برمجة تطبيقات معقولة لاستخدامها. AMQP Agent يحل هذه المشكلة من خلال جعل أكبر قدر ممكن من التجريد دون فقدان السيطرة على العمال وبإعادة المصطلحات المرتبطة بوسطاء الرسائل ، فإن الناشر والمستهلك هو كل ما تحتاج إلى التعامل معه إذا كنت قادمًا جديدًا.
وفقًا لهذا الشعار ، يجعل AmqP Agent العمل مع RabbitMQ ممتعة وأنيقة قدر الإمكان من خلال تعريض بعض الواجهات التي يتم تنفيذها بذكاء ، وتنمية PHP الحديثة ، ومن الجيد العمل معها وبسيطة للغاية ؛ ومع ذلك ، يمكن أن تكتب أصغر المراوغات في أي نقطة من العمل مع العامل. مع AmqP Agent ، يمكنك البدء في نشر واستهلاك الرسائل مع بضعة أسطر من التعليمات البرمجية!
لا يقوم Amqp Agent بكتابة أي شيء من PHP-AMQPLIB ولا يغير المصطلحات المرتبطة بوظائفه. إنه يبسطه فقط ؛ يخرج ضجيج أسماء الوظائف ويمتدها في بعض الأماكن. كما أنه يضيف بعض الميزات اللطيفة مثل الأوسمة العمال ، وانتظار القناة الديناميكية ، وطرق التسهيل.
يقدم AmqP Agent أيضًا عميل RPC القوي على الحدث وخادم RPC لمشاريع إنترنت الأشياء الخاصة بك.
يمكن أن يكون العمل مع وكيل AMQP سهلاً مثل:
// Publisher
$ publisher = new Publisher ();
$ publisher -> work ( $ messages );
// Consumer
$ consumer = new Consumer ();
$ consumer -> work ( $ callback );
// RPC Client
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( $ request );
$ rpcClient -> disconnect ();
// RPC Server
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( $ callback );
$ rpcServer -> disconnect ();يعرض AmqP Agent عددًا من الفئات الخرسانية التي يمكن استخدامها مباشرة والفئات المجردة الأخرى التي يمكن تمديدها. هذان الشخصان الطبقيان لديهم أيضا قسم فرعي مساعد.
| فصل | وصف | API |
|---|---|---|
مجردة عامل *A | فئة مجردة تنفذ الوظيفة الأساسية للعامل. | مستند |
الناشر *C*S | فئة متخصصة في النشر. تنفيذ الأساليب اللازمة للناشر فقط. | مستند |
المستهلك *C*S | فئة متخصصة في الاستهلاك. تنفيذ الأساليب المطلوبة فقط للمستهلك. | مستند |
AbstractEndPoint *A | فئة مجردة تنفذ الوظيفة الأساسية لنقطة النهاية. | مستند |
ClientEndPoint *C | فئة متخصصة في الطلب. تنفيذ الأساليب فقط اللازمة للعميل. | مستند |
ServerEndPoint *C | فئة متخصصة في الرد. تنفيذ الأساليب المطلوبة فقط لخادم. | مستند |
AMQPagentParameters *C*H | فئة تحتوي على جميع معلمات وكيل AMQP كثوابت. | مستند |
فائدة *C*H | فئة تحتوي على وظائف المساعد المتنوعة. | مستند |
الحدث *C*H | فئة بسيطة للتعامل مع الأحداث (الإرسال والاستماع). | مستند |
arrayproxy *C*H | فئة تحتوي على أساليب لمعالجة المصفوفات والعمل. | مستند |
classproxy *C*H | فئة تحتوي على أساليب لطرق الوكيل ، والمعالجة خصائص ، والمرافق الطبقية. | مستند |
idgenerator *C*H | فئة تحتوي على وظائف لتوليد معرفات فريدة ورموز | مستند |
Serializer *C*H | المسلسل المرن لاستخدامه بالتزامن مع العمال. | مستند |
المسجل *C*H | فئة لكتابة سجلات ، وكشف الأساليب التي تعمل بشكل ثابت وعلى مثيل. | مستند |
Singleton *A*H | فئة مجردة تنفذ الوظيفة الأساسية للفرد. | مستند |
config *C*R | فئة تحول ملف التكوين إلى كائن. | مستند |
العميل *C*R | يعيد الفصل كل ما يقدمه وكيل AMQP. حاوية خدمة بسيطة حتى أقول. | مستند |
مثال *A*H | فئة مجردة تستخدم كاستدعاء افتراضي للمستهلك. | مستند |
راجع أيضًا: Abstractworkersingleton ، Publishersingleton ، المستهلكين ، ForctionWorkerInterface ، PublisherInterface ، ConsumerInterface ، WorkerFaciLitationInterface ، WorkermattionTrait ، WorkerCommandTrait ، AbstractEndPointInterface ، ClientendPointInterface ، ServerendPointInterface ، EventTrait ، ArrayProxyTrait ،
*C ملموسة: هذه الفئة هي فئة ملموسة ويمكن إنشاء مثيل لها مباشرة.*A ملخص: هذا الفصل هو فئة مجردة ولا يمكن إنشاء مثيل لها مباشرة.*H Helper: هذا الفصل هو فئة مساعد. يمكن استخدام بدائل الطرف الثالث بحرية بدلاً من ذلك.*R موصى به: يوصى باستخدام هذا الفصل عند العمل مع AMQP Agent (أفضل الممارسات).*S Singleton: يحتوي هذا الفصل على نسخة مفردة متوفرة عبر اللاحقة اسم الفصل مع Singleton ويمكن استردادها عبر *Singleton::getInstance() ، أي Publisher -> PublisherSingleton .ملاحظة: يعتبر Singleton بمثابة نماذج ، حاول تجنبها قدر الإمكان ، على الرغم من وجود حالات استخدام لذلك. استخدم Singletons فقط إذا كنت تعرف ما تفعله.
إذا كنت تريد فقط نشر الرسائل واستهلاكها ، فكل شيء جاهز وتكوينه بالفعل ، يتم شحن AmqP Agent بتكوين تم اختباره يتبع أفضل الممارسات. يمكنك ببساطة استيراد فئة Publisher و/أو فئة Consumer في ملفك والكتابة فوق المعلمات التي تريدها (بيانات اعتماد RabbitMQ على سبيل المثال) لاحقًا على المثيل.
إذا كنت ترغب في ضبط تكوين وكيل AMQP وتعديله على احتياجاتك الدقيقة ، فهناك القليل من العمل الذي يجب القيام به. يجب عليك توفير ملف تكوين (انظر: Maks-AMQP-Agent-Config.php والانتباه إلى التعليقات). ليس عليك توفير كل شيء ، يمكنك ببساطة كتابة المعلمات التي تريد الكتابة فوقها ، وكيل AMQP ذكي بما يكفي لإلحاق النقص. يمكن أيضًا كتابة هذه المعلمات لاحقًا من خلال تدوين المهمة العامة أو استدعاء طريقة.
حقيقة: يستخدم Amqp Agent نفس أسماء المعلمات مثل PHP-AMQPlib في ملف التكوين وفي صفيف المعلمات تم تمريره على استدعاء الطريقة.
<?php return [
// Global
' connectionOptions ' => [
' host ' => ' your-rabbitmq-server.com ' ,
' port ' => 5672 ,
' user ' => ' your-username ' ,
' password ' => ' your-password ' ,
' vhost ' => ' / '
],
' queueOptions ' => [
' queue ' => ' your.queue.name ' ,
' durable ' => true ,
' nowait ' => false
],
// Publisher
' exchangeOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' type ' => ' direct '
],
' bindOptions ' => [
' queue ' => ' your.queue.name ' ,
' exchange ' => ' your.exchange.name '
],
' messageOptions ' => [
' properties ' => [
' content_type ' => ' application/json ' ,
' content_encoding ' => ' UTF-8 ' ,
' delivery_mode ' => 2
]
],
' publishOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' routing_key ' => ' your.route.name '
],
// Consumer
' qosOptions ' => [
' prefetch_count ' => 25
],
' waitOptions ' => [
' timeout ' => 3600
],
' consumeOptions ' => [
' queue ' => ' your.queue.name ' ,
' consumer_tag ' => ' your.consumer.name ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback '
]
// RPC Endpoints
'rpcQueueName' => 'your.rpc.queue.name'
]; ملاحظة: أسماء المفاتيح من المستوى الأول Array (الملاحقة مع Options ) خاصة بعامل AMQP.
قبل أن نبدأ بأمثلة ، يتعين علينا توضيح بعض الأشياء. تجدر الإشارة إلى أنه من البداية مع AmqP Agent ، هناك طرق متعددة لكيفية استرداد عامل ، وهناك طريقة بسيطة ، والطريقة الموصى بها ، والطرق الأكثر تقدماً. بعد استرداد عامل ، يشبه الطين ، يمكنك تكوينه بالطريقة التي تريدها. يستوعب هذا التصميم المعياري بأمان احتياجاتك ، ويقود إلى قاعدة كود قابلة للتطوير ، ويجعل الجميع سعداء.
new . بهذه الطريقة تتطلب تمرير المعلمات عبر مُنشئ أو مكالمات الأسلوب أو تعيين الممتلكات العامة.PublisherSingleton::getInstance() . بهذه الطريقة تتطلب تمرير المعلمات عبر طريقة getInstance() أو مكالمات الطريقة أو تعيين الممتلكات العامة.Client . بهذه الطريقة تجعل الكود أكثر قابلية للقراءة حيث يتم استرداد المعلمات من التكوين الذي تم تمريره. // Instantiating Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Worker PublisherSingleton ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Worker ConsumerSingleton ;
use MAKS AmqpAgent RPC ClientEndpoint ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ publisher1 = new Publisher ( /* parameters can be passed here */ );
$ publisher2 = PublisherSingleton:: getInstance ( /* parameters can be passed here */ );
$ consumer1 = new Consumer ( /* parameters can be passed here */ );
$ consumer2 = ConsumerSingleton:: getInstance ( /* parameters can be passed here */ );
$ rpcClientA = new ClientEndpoint ( /* parameters can be passed here */ );
$ rpcServerA = new ServerEndpoint ( /* parameters can be passed here */ );
// the parameters from this Config object will be passed to the workers.
$ config = new Config ( ' path/to/your/config-file.php ' );
$ client = new Client ( $ config ); // path can also be passed directly to Client
$ publisher3 = $ client -> getPublisher (); // or $client->get('publisher');
$ consumer3 = $ client -> getConsumer (); // or $client->get('consumer');
$ rpcClientB = $ client -> getClientEndpoint (); // or $client->get('client.endpoint');
$ rpcServerB = $ client -> getServerEndpoint (); // or $client->get('server.endpoint');
// Use $client->gettable() to get an array of all available services. // Publisher Demo 1
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// exchangeOptions
' exchange ' => ' test.messages.exchange '
],
[
// bindOptions
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
],
[
// messageOptions
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ publisher -> connect ()-> queue ()-> exchange ()-> bind ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (2)
$ publisher -> prepare ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (3)
$ publisher -> work ( $ messages ); // Publisher Demo 2
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ publisher -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ publisher -> connect ();
$ publisher -> queue ([
' queue ' => ' test.messages.queue '
]);
$ publisher -> exchange ([
' exchange ' => ' test.messages.exchange '
]);
$ publisher -> bind ([
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
]);
foreach ( $ messages as $ message ) {
$ publisher -> publish (
[
' body ' => $ message ,
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
' exchange ' => ' test.messages.exchange '
]
);
}
$ publisher -> disconnect (); // Consumer Demo 1
$ consumer = new Consumer (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// qosOptions
' exchange ' => ' test.messages.exchange '
],
[
// waitOptions
],
[
// consumeOptions
' queue ' => ' test.messages.queue ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback ' ,
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ consumer -> connect ();
$ consumer -> queue ();
$ consumer -> qos ();
$ consumer -> consume ();
$ consumer -> wait ();
$ consumer -> disconnect ();
// Variant I (2)
$ consumer -> prepare ()-> consume ()-> wait ()-> disconnect ();
// Variant I (3)
$ consumer -> work ( ' YourNamespaceYourClass::yourCallback ' ); // Consumer Demo 2
$ variable = ' This variable is needed in your callback. It will be the second, the first is always the message! ' ;
$ consumer = new Consumer ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ consumer -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ consumer -> connect ();
$ consumer -> queue ([
' queue ' => ' test.messages.queue '
]);
$ consumer -> qos ([
' prefetch_count ' => 10
]);
$ consumer -> consume (
[
' YourNamespaceYourClass ' ,
' yourCallback '
],
[
$ variable
],
[
' queue ' => ' test.messages.queue '
]
);
$ consumer -> wait ();
$ consumer -> disconnect (); // RPC Client Demo 1
$ rpcClient = new ClientEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // RPC Client Demo 2
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ response = $ rpcClient -> request (
' {"command":"some-command","parameter":"some-parameter"} ' ,
' your.rpc.queue.name '
);
$ rpcClient -> disconnect (); // RPC Server Demo 1
$ rpcServer = new ServerEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect (); // RPC Server Demo 2
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ request = $ rpcServer -> respond (
' YourNamespaceYourClass::yourCallback ' ,
' your.rpc.queue.name '
);
$ rpcServer -> disconnect ();حقيقة: عند توفير المعلمات توفر فقط المعلمات التي تحتاجها. AMQP Agent ذكي بما يكفي لإلحاق النقص.
نصيحة: يمكنك تبسيط المنشآت الثقيلة المكتوبة في الأمثلة أعلاه إذا كنت تستخدم get($className) على مثيل من فئة Client بعد توفير ملف تكوين بالمعلمات التي تريدها.
ملاحظة: راجع مستندات AMQP Agent للشرح الكامل للطرق. ارجع إلى وثائق RABBITMQ و PHP-AMQPLIB للتفسير الكامل للمعلمات.
في هذه الأمثلة ، سترى كيف ستعمل مع Amqp Agent في سيناريو العالم الحقيقي.
// Advanced Publisher Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Helper Serializer ;
// Preparing some data to work with.
$ data = [];
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ data [] = [
' id ' => $ i ,
' importance ' => $ i % 3 == 0 ? ' high ' : ' low ' , // Tag 1/3 of the messages with high importance.
' text ' => ' Test message with ID ' . $ i
];
}
// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$ config = new Config ();
// Instantiating a client.
$ client = new Client ( $ config );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a publisher from the client.
/** @var MAKSAmqpAgentWorkerPublisher */
$ publisher = $ client -> get ( ' publisher ' );
// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$ publisher -> connect ();
// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$ publisher -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ publisher -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Declaring a direct exchange to publish messages to.
$ publisher -> exchange ([
' exchange ' => ' high.and.low.importance.exchange ' ,
' type ' => ' direct '
]);
// Binding the queue with the exchange.
$ publisher -> bind ([
' queue ' => ' high.and.low.importance.queue ' ,
' exchange ' => ' high.and.low.importance.exchange '
]);
// Publishing messages according to their priority.
foreach ( $ data as $ item ) {
$ payload = $ serializer -> serialize ( $ item , ' JSON ' );
if ( $ item [ ' importance ' ] == ' high ' ) {
$ publisher -> publish (
[
' body ' => $ payload ,
' properties ' => [
' priority ' => 2
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
continue ;
}
$ publisher -> publish (
$ payload , // Not providing priority will fall back to 0
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
}
// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' start ' , ' consumer ' ),
' JSON '
),
' properties ' => [
' priority ' => 1
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$ iterator = 2 ;
do {
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' close ' , ' channel ' ),
' JSON '
),
' properties ' => [
' priority ' => 0
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
$ iterator --;
} while ( $ iterator != 0 );
// Close the connection with RabbitMQ server.
$ publisher -> disconnect (); // Advanced Consumer Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Helper Serializer ;
use MAKS AmqpAgent Helper Logger ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var MAKSAmqpAgentHelperLogger */
$ logger = $ client -> get ( ' logger ' );
$ logger -> setDirectory ( __DIR__ );
$ logger -> setFilename ( ' high-and-low-importance-messages ' );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a consumer from the client.
/** @var MAKSAmqpAgentWorkerConsumer */
$ consumer = $ client -> get ( ' consumer ' );
$ consumer -> connect ();
// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$ consumer -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Overwriting the default quality of service.
$ consumer -> qos ([
' prefetch_count ' => 1 ,
]);
// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$ callback = function ( $ message , & $ client , $ callback ) {
$ data = $ client -> getSerializer ()-> unserialize ( $ message -> body , ' JSON ' );
if (Consumer:: isCommand ( $ data )) {
Consumer:: ack ( $ message );
if (Consumer:: hasCommand ( $ data , ' close ' , ' channel ' )) {
// Giving time for acknowledgements to take effect,
// because the channel will be closed shortly
sleep ( 5 );
// Close the channel using the delivery info of the message.
Consumer:: shutdown ( $ message );
} elseif (Consumer:: hasCommand ( $ data , ' start ' , ' consumer ' )) {
$ consumer = $ client -> getConsumer ();
// Getting a new channel on the same connection.
$ channel = $ consumer -> getNewChannel ();
$ consumer -> queue (
[
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
],
$ channel
);
$ consumer -> qos (
[
' prefetch_count ' => 1 ,
],
$ channel
);
$ consumer -> consume (
$ callback ,
[
& $ client ,
$ callback
],
[
' queue ' => ' high.and.low.importance.queue ' ,
' consumer_tag ' => ' callback.consumer- ' . uniqid ()
],
$ channel
);
}
return ;
}
$ client -> getLogger ()-> write ( " ( { $ data [ ' importance ' ]} ) - { $ data [ ' text ' ]}" );
// Sleep for 50ms to mimic some processing.
usleep ( 50000 );
// The final step is acknowledgment so that no data is lost.
Consumer:: ack ( $ message );
};
$ consumer -> consume (
$ callback ,
[
& $ client , // Is used to refetch the consumer, serializer, and logger.
$ callback // This gets passed to the consumer that get started by the callback.
],
[
' queue ' => ' high.and.low.importance.queue '
]
);
// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$ consumer -> waitForAll ();
// Close the connection with RabbitMQ server.
$ consumer -> disconnect (); // Advanced RPC Client Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ClientEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC client endpoint from the client.
/** @var MAKSAmqpAgentRPCClientEndpoint */
$ rpcClient = $ client -> getClientEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$ rpcClient
-> on ( ' connection.after.open ' , function ( $ connection , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is now connected! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ connection instanceof AMQPStreamConnection) {
printf ( ' The connection has currently %d channel(s). ' , count ( $ connection -> channels ) - 1 );
}
})-> on ( ' request.before.send ' , function ( $ request , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is about to send a request! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ request instanceof AMQPMessage) {
$ request -> set ( ' content_type ' , ' application/json ' )
printf ( ' The request content_type header has been set to: %s ' , $ request -> get ( ' content_type ' ));
}
});
// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$ roundtrip = $ rpcClient -> ping ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // Advanced RPC Server Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC server from the client.
/** @var MAKSAmqpAgentRPCServerEndpoint */
$ rpcServer = $ client -> getServerEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$ rpcServer
-> on ( ' request.on.get ' , function ( $ request , $ rpcServer , $ eventName ) {
printf ( ' %s has emitted [%s] event and has just got a request! ' , get_class ( $ rpcServer ), $ eventName );
if ( $ request instanceof AMQPMessage) {
printf ( ' The request has the following body: %s ' , $ request -> body ;
}
});
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect ();حقيقة: يمكنك جعل الرمز في Publisher/Consumer Advanced Advanced Advanced Ampressions أكثر سهولة إذا قمت بإجراء تغييرات جميع المعلمات في ملف التكوين ونقله إلى العميل بدلاً من الافتراضي.
النصيحة: تم توثيق قاعدة رمز AMQP بشكل جيد ، يرجى الرجوع إلى هذا الرابط لإلقاء نظرة على جميع الفئات والأساليب.
AMQP Agent عبارة عن حزمة مفتوحة المصدر مرخصة بموجب GNU LGPL V2.1 بسبب ترخيص PHP-AMQPLIB.
حقوق الطبع والنشر (ج) 2020 مروان السولتاني. جميع الحقوق محفوظة.