Pembungkus elegan di sekitar PHP-AMQPLIB yang terkenal untuk kasus penggunaan 90%.
Instalasi
Tentang agen AMQP
API
Dokumentasi
Konfigurasi
Contoh
Tautan
Lisensi
Changelog
Coba AMQP Agent sekarang:
composer require marwanalsoltany/amqp-agent Salin konfigurasi ini di composer.json Anda:
"repositories" : {
"amqp-agent-repo" : {
"type" : " vcs " ,
"url" : " https://github.com/MarwanAlsoltany/amqp-agent.git "
}
},
"require" : {
"marwanalsoltany/amqp-agent" : " dev-dev "
},
"minimum-stability" : " dev "Berlari:
composer update Catatan: Agen AMQP mendukung sekarang PHP 7.1 secara default mulai dari versi v1.1.1, jika Anda menggunakan cabang php7.1-compatibility di versi yang lebih lama, perbarui composer.json Anda!
Agen AMQP mencoba menyederhanakan implementasi broker pesan dalam proyek PHP. Ini menghilangkan seluruh overhead membangun dan mengkonfigurasi objek atau membuat kelas yang Anda perlukan untuk berbicara dengan server RabbitMQ (melalui PHP-AMQPLIB ) dan mengekspos API yang diuji, sepenuhnya dapat dikonfigurasi, dan fleksibel yang sesuai dengan hampir semua proyek.
Perpustakaan PHP-AMQPLIB luar biasa dan bekerja dengan sangat baik. Satu-satunya masalah adalah, cukup telanjang untuk digunakan dalam suatu proyek, tanpa membuat ulang kelas pembungkus Anda sendiri, hampir tidak mungkin untuk tidak menulis kode spaghetti. Ditambah jumlah fungsi, metode, dan konfigurasi (parameter) yang sangat besar yang datang membuatnya sangat sulit untuk menerapkan API yang masuk akal untuk digunakan. Agen AMQP memecahkan masalah ini dengan membuat abstraksi sebanyak mungkin tanpa kehilangan kendali atas para pekerja dan dengan mengembalikan terminologi yang terkait dengan pialang pesan, penerbit dan konsumen adalah semua yang perlu Anda tangani jika Anda adalah pendatang baru.
Menurut moto ini, agen AMQP membuat bekerja dengan RabbitMQ sebagai hal yang menyenangkan dan elegan dengan mengekspos beberapa antarmuka lancar yang secara cerdik diimplementasikan, sesuai dengan pengembangan PHP modern, bagus untuk dikerjakan dan sangat mudah digunakan; Namun sangat kuat dan dapat menimpa keanehan terkecil di setiap titik bekerja dengan pekerja. Dengan agen AMQP Anda dapat mulai menerbitkan dan mengonsumsi pesan hanya dengan beberapa baris kode!
Agen AMQP tidak menimpa apa pun dari PHP-AMQPLIB atau tidak mengubah terminologi yang terkait dengan fungsinya. Itu hanya menyederhanakannya; Mengambil kebisingan nama fungsi dan memperluasnya di beberapa tempat. Ini juga menambahkan beberapa fitur bagus seperti perintah pekerja, metode yang menunggu saluran dinamis, dan metode fasilitasi.
Agen AMQP juga menawarkan klien RPC dan server RPC berbasis acara yang kuat untuk proyek IoT Anda.
Bekerja dengan agen AMQP bisa semudah:
// Publisher
$ publisher = new Publisher ();
$ publisher -> work ( $ messages );
// Consumer
$ consumer = new Consumer ();
$ consumer -> work ( $ callback );
// RPC Client
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( $ request );
$ rpcClient -> disconnect ();
// RPC Server
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( $ callback );
$ rpcServer -> disconnect ();Agen AMQP memaparkan sejumlah kelas konkret yang dapat langsung digunakan dan kelas abstrak lainnya yang dapat diperpanjang. Dua class-varian ini juga memiliki sub-divisi penolong.
| Kelas | Keterangan | API |
|---|---|---|
ABSTRAK PRWERFORMER *A | Kelas abstrak yang mengimplementasikan fungsi dasar seorang pekerja. | Dokter |
Penerbit *C*S | Kelas yang berspesialisasi dalam penerbitan. Hanya menerapkan metode yang dibutuhkan untuk penerbit. | Dokter |
Konsumen *C*S | Kelas yang berspesialisasi dalam konsumsi. Hanya menerapkan metode yang dibutuhkan untuk konsumen. | Dokter |
Abstractendpoint *A | Kelas abstrak yang menerapkan fungsi dasar dari titik akhir. | Dokter |
ClientendPoint *C | Kelas yang berspesialisasi dalam meminta. Hanya menerapkan metode yang dibutuhkan untuk klien. | Dokter |
ServerEndPoint *C | Kelas yang berspesialisasi dalam menanggapi. Hanya menerapkan metode yang dibutuhkan untuk server. | Dokter |
AmqPagentParameters *C*H | Kelas yang berisi semua parameter agen AMQP sebagai konstanta. | Dokter |
Utilitas *C*H | Kelas yang berisi fungsi pembantu lain -lain. | Dokter |
Acara *C*H | Kelas sederhana untuk menangani acara (pengiriman dan mendengarkan). | Dokter |
ArrayProxy *C*H | Kelas yang berisi metode untuk memanipulasi dan array kerja. | Dokter |
ClassProxy *C*H | Kelas yang berisi metode untuk panggilan metode proxy, manipulasi properti, dan utilitas kelas. | Dokter |
IDGenerator *C*H | Kelas yang berisi fungsi untuk menghasilkan ID dan token unik | Dokter |
Serializer *C*H | Serializer fleksibel untuk digunakan bersama dengan para pekerja. | Dokter |
Logger *C*H | Kelas untuk menulis log, mengekspos metode yang bekerja secara statis dan instantiasi. | Dokter |
Singleton *A*H | Kelas abstrak yang mengimplementasikan fungsi dasar singleton. | Dokter |
Config *C*R | Kelas yang mengubah file konfigurasi menjadi objek. | Dokter |
Klien *C*R | Kelas mengembalikan semua yang ditawarkan oleh agen AMQP. Wadah layanan sederhana untuk dikatakan. | Dokter |
Contoh *A*H | Kelas abstrak yang digunakan sebagai panggilan balik default untuk konsumen. | Dokter |
See also: AbstractWorkerSingleton, PublisherSingleton, ConsumerSingleton, AbstractWorkerInterface, PublisherInterface, ConsumerInterface, WorkerFacilitationInterface, WorkerMutationTrait, WorkerCommandTrait, AbstractEndpointInterface, ClientEndpointInterface, ServerEndpointInterface, EventTrait, ArrayProxyTrait, ClassProxyTrait, AbstractParameters.
*C Beton: Kelas ini adalah kelas konkret dan dapat dipakai secara langsung.*A Abstrak: Kelas ini adalah kelas abstrak dan tidak dapat dipakai secara langsung.*H Helper: Kelas ini adalah kelas penolong. Alternatif pihak ketiga dapat digunakan secara bebas sebagai gantinya.*R Dianjurkan: Kelas ini disarankan untuk digunakan saat bekerja dengan agen AMQP (praktik terbaik).*S Singleton: Kelas ini memiliki versi singleton yang tersedia melalui suffixing nama kelas dengan Singleton dan dapat diambil melalui *Singleton::getInstance() , yaitu Publisher -> PublisherSingleton .Catatan: Singleton dianggap sebagai anti-pola, cobalah menghindarinya sebanyak mungkin, meskipun ada kasus penggunaan untuk itu. Gunakan singleton hanya jika Anda tahu apa yang Anda lakukan.
Jika Anda dengan cepat ingin mempublikasikan dan mengonsumsi pesan, semuanya sudah siap dan dikonfigurasi, agen AMQP dikirimkan dengan konfigurasi yang diuji yang mengikuti praktik terbaik. Anda cukup mengimpor kelas Publisher dan/atau kelas Consumer dalam file Anda dan menimpa parameter yang Anda inginkan (kredensial RabbitMQ misalnya) nanti pada instance.
Jika Anda ingin menyempurnakan dan mengubah konfigurasi agen AMQP sesuai kebutuhan Anda, ada sedikit pekerjaan yang harus dilakukan. Anda harus menyediakan file konfigurasi (lihat: MAKS-AMQP-agent-config.php dan perhatikan komentar). Anda tidak perlu menyediakan semuanya, Anda hanya dapat menulis parameter yang ingin Anda timpa, agen AMQP cukup pintar untuk menambahkan kekurangan. Parameter ini juga dapat ditimpa nanti melalui notasi penugasan publik atau per metode panggilan.
Fakta: Agen AMQP menggunakan nama parameter yang sama dengan PHP-AMQPLIB di file konfigurasi dan dalam array parameter yang dilewati pada panggilan metode.
<?php return [
// Global
' connectionOptions ' => [
' host ' => ' your-rabbitmq-server.com ' ,
' port ' => 5672 ,
' user ' => ' your-username ' ,
' password ' => ' your-password ' ,
' vhost ' => ' / '
],
' queueOptions ' => [
' queue ' => ' your.queue.name ' ,
' durable ' => true ,
' nowait ' => false
],
// Publisher
' exchangeOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' type ' => ' direct '
],
' bindOptions ' => [
' queue ' => ' your.queue.name ' ,
' exchange ' => ' your.exchange.name '
],
' messageOptions ' => [
' properties ' => [
' content_type ' => ' application/json ' ,
' content_encoding ' => ' UTF-8 ' ,
' delivery_mode ' => 2
]
],
' publishOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' routing_key ' => ' your.route.name '
],
// Consumer
' qosOptions ' => [
' prefetch_count ' => 25
],
' waitOptions ' => [
' timeout ' => 3600
],
' consumeOptions ' => [
' queue ' => ' your.queue.name ' ,
' consumer_tag ' => ' your.consumer.name ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback '
]
// RPC Endpoints
'rpcQueueName' => 'your.rpc.queue.name'
]; Catatan: Array nama kunci tingkat pertama (sufiks dengan Options ) khusus untuk agen AMQP.
Sebelum kita mulai dengan contoh, kita harus mengklarifikasi beberapa hal. Perlu disebutkan sejak awal bahwa dengan agen AMQP ada banyak cara untuk bagaimana Anda dapat mengambil pekerja, ada cara sederhana, cara yang disarankan, dan cara yang lebih maju. Setelah Anda mengambil pekerja, itu seperti tanah liat, Anda dapat membentuknya seperti yang Anda inginkan. Desain modular ini dengan anggun mengakomodasi kebutuhan Anda, berkendara ke basis kode yang dapat diskalakan, dan hanya membuat semua orang bahagia.
new . Dengan cara ini membutuhkan parameter yang lewat melalui konstruktor, panggilan metode, atau penugasan properti publik.PublisherSingleton::getInstance() . Cara ini membutuhkan parameter yang lewat melalui metode getInstance() , panggilan metode, atau penugasan properti publik.Client . Cara ini juga membuat kode lebih mudah dibaca karena parameter diambil dari konfigurasi yang dilewati. // Instantiating Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Worker PublisherSingleton ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Worker ConsumerSingleton ;
use MAKS AmqpAgent RPC ClientEndpoint ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ publisher1 = new Publisher ( /* parameters can be passed here */ );
$ publisher2 = PublisherSingleton:: getInstance ( /* parameters can be passed here */ );
$ consumer1 = new Consumer ( /* parameters can be passed here */ );
$ consumer2 = ConsumerSingleton:: getInstance ( /* parameters can be passed here */ );
$ rpcClientA = new ClientEndpoint ( /* parameters can be passed here */ );
$ rpcServerA = new ServerEndpoint ( /* parameters can be passed here */ );
// the parameters from this Config object will be passed to the workers.
$ config = new Config ( ' path/to/your/config-file.php ' );
$ client = new Client ( $ config ); // path can also be passed directly to Client
$ publisher3 = $ client -> getPublisher (); // or $client->get('publisher');
$ consumer3 = $ client -> getConsumer (); // or $client->get('consumer');
$ rpcClientB = $ client -> getClientEndpoint (); // or $client->get('client.endpoint');
$ rpcServerB = $ client -> getServerEndpoint (); // or $client->get('server.endpoint');
// Use $client->gettable() to get an array of all available services. // Publisher Demo 1
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// exchangeOptions
' exchange ' => ' test.messages.exchange '
],
[
// bindOptions
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
],
[
// messageOptions
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ publisher -> connect ()-> queue ()-> exchange ()-> bind ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (2)
$ publisher -> prepare ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (3)
$ publisher -> work ( $ messages ); // Publisher Demo 2
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ publisher -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ publisher -> connect ();
$ publisher -> queue ([
' queue ' => ' test.messages.queue '
]);
$ publisher -> exchange ([
' exchange ' => ' test.messages.exchange '
]);
$ publisher -> bind ([
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
]);
foreach ( $ messages as $ message ) {
$ publisher -> publish (
[
' body ' => $ message ,
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
' exchange ' => ' test.messages.exchange '
]
);
}
$ publisher -> disconnect (); // Consumer Demo 1
$ consumer = new Consumer (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// qosOptions
' exchange ' => ' test.messages.exchange '
],
[
// waitOptions
],
[
// consumeOptions
' queue ' => ' test.messages.queue ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback ' ,
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ consumer -> connect ();
$ consumer -> queue ();
$ consumer -> qos ();
$ consumer -> consume ();
$ consumer -> wait ();
$ consumer -> disconnect ();
// Variant I (2)
$ consumer -> prepare ()-> consume ()-> wait ()-> disconnect ();
// Variant I (3)
$ consumer -> work ( ' YourNamespaceYourClass::yourCallback ' ); // Consumer Demo 2
$ variable = ' This variable is needed in your callback. It will be the second, the first is always the message! ' ;
$ consumer = new Consumer ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ consumer -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ consumer -> connect ();
$ consumer -> queue ([
' queue ' => ' test.messages.queue '
]);
$ consumer -> qos ([
' prefetch_count ' => 10
]);
$ consumer -> consume (
[
' YourNamespaceYourClass ' ,
' yourCallback '
],
[
$ variable
],
[
' queue ' => ' test.messages.queue '
]
);
$ consumer -> wait ();
$ consumer -> disconnect (); // RPC Client Demo 1
$ rpcClient = new ClientEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // RPC Client Demo 2
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ response = $ rpcClient -> request (
' {"command":"some-command","parameter":"some-parameter"} ' ,
' your.rpc.queue.name '
);
$ rpcClient -> disconnect (); // RPC Server Demo 1
$ rpcServer = new ServerEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect (); // RPC Server Demo 2
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ request = $ rpcServer -> respond (
' YourNamespaceYourClass::yourCallback ' ,
' your.rpc.queue.name '
);
$ rpcServer -> disconnect ();Fakta: Saat memasok parameter hanya memberikan parameter yang Anda butuhkan. Agen AMQP cukup pintar untuk menambahkan kekurangan.
Saran: Anda dapat menyederhanakan konstruktor berat yang ditulis dalam contoh di atas jika Anda menggunakan get($className) pada instance kelas Client setelah memberikan file konfigurasi dengan parameter yang Anda inginkan.
Catatan: Lihat dokumen agen AMQP untuk penjelasan lengkap dari metode tersebut. Lihat dokumentasi RabbitMQ dan PHP-AMQPLIB untuk penjelasan lengkap dari parameter.
Dalam contoh-contoh ini, Anda akan melihat bagaimana Anda akan bekerja dengan agen AMQP dalam skenario dunia nyata.
// Advanced Publisher Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Helper Serializer ;
// Preparing some data to work with.
$ data = [];
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ data [] = [
' id ' => $ i ,
' importance ' => $ i % 3 == 0 ? ' high ' : ' low ' , // Tag 1/3 of the messages with high importance.
' text ' => ' Test message with ID ' . $ i
];
}
// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$ config = new Config ();
// Instantiating a client.
$ client = new Client ( $ config );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a publisher from the client.
/** @var MAKSAmqpAgentWorkerPublisher */
$ publisher = $ client -> get ( ' publisher ' );
// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$ publisher -> connect ();
// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$ publisher -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ publisher -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Declaring a direct exchange to publish messages to.
$ publisher -> exchange ([
' exchange ' => ' high.and.low.importance.exchange ' ,
' type ' => ' direct '
]);
// Binding the queue with the exchange.
$ publisher -> bind ([
' queue ' => ' high.and.low.importance.queue ' ,
' exchange ' => ' high.and.low.importance.exchange '
]);
// Publishing messages according to their priority.
foreach ( $ data as $ item ) {
$ payload = $ serializer -> serialize ( $ item , ' JSON ' );
if ( $ item [ ' importance ' ] == ' high ' ) {
$ publisher -> publish (
[
' body ' => $ payload ,
' properties ' => [
' priority ' => 2
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
continue ;
}
$ publisher -> publish (
$ payload , // Not providing priority will fall back to 0
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
}
// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' start ' , ' consumer ' ),
' JSON '
),
' properties ' => [
' priority ' => 1
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$ iterator = 2 ;
do {
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' close ' , ' channel ' ),
' JSON '
),
' properties ' => [
' priority ' => 0
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
$ iterator --;
} while ( $ iterator != 0 );
// Close the connection with RabbitMQ server.
$ publisher -> disconnect (); // Advanced Consumer Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Helper Serializer ;
use MAKS AmqpAgent Helper Logger ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var MAKSAmqpAgentHelperLogger */
$ logger = $ client -> get ( ' logger ' );
$ logger -> setDirectory ( __DIR__ );
$ logger -> setFilename ( ' high-and-low-importance-messages ' );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a consumer from the client.
/** @var MAKSAmqpAgentWorkerConsumer */
$ consumer = $ client -> get ( ' consumer ' );
$ consumer -> connect ();
// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$ consumer -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Overwriting the default quality of service.
$ consumer -> qos ([
' prefetch_count ' => 1 ,
]);
// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$ callback = function ( $ message , & $ client , $ callback ) {
$ data = $ client -> getSerializer ()-> unserialize ( $ message -> body , ' JSON ' );
if (Consumer:: isCommand ( $ data )) {
Consumer:: ack ( $ message );
if (Consumer:: hasCommand ( $ data , ' close ' , ' channel ' )) {
// Giving time for acknowledgements to take effect,
// because the channel will be closed shortly
sleep ( 5 );
// Close the channel using the delivery info of the message.
Consumer:: shutdown ( $ message );
} elseif (Consumer:: hasCommand ( $ data , ' start ' , ' consumer ' )) {
$ consumer = $ client -> getConsumer ();
// Getting a new channel on the same connection.
$ channel = $ consumer -> getNewChannel ();
$ consumer -> queue (
[
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
],
$ channel
);
$ consumer -> qos (
[
' prefetch_count ' => 1 ,
],
$ channel
);
$ consumer -> consume (
$ callback ,
[
& $ client ,
$ callback
],
[
' queue ' => ' high.and.low.importance.queue ' ,
' consumer_tag ' => ' callback.consumer- ' . uniqid ()
],
$ channel
);
}
return ;
}
$ client -> getLogger ()-> write ( " ( { $ data [ ' importance ' ]} ) - { $ data [ ' text ' ]}" );
// Sleep for 50ms to mimic some processing.
usleep ( 50000 );
// The final step is acknowledgment so that no data is lost.
Consumer:: ack ( $ message );
};
$ consumer -> consume (
$ callback ,
[
& $ client , // Is used to refetch the consumer, serializer, and logger.
$ callback // This gets passed to the consumer that get started by the callback.
],
[
' queue ' => ' high.and.low.importance.queue '
]
);
// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$ consumer -> waitForAll ();
// Close the connection with RabbitMQ server.
$ consumer -> disconnect (); // Advanced RPC Client Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ClientEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC client endpoint from the client.
/** @var MAKSAmqpAgentRPCClientEndpoint */
$ rpcClient = $ client -> getClientEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$ rpcClient
-> on ( ' connection.after.open ' , function ( $ connection , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is now connected! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ connection instanceof AMQPStreamConnection) {
printf ( ' The connection has currently %d channel(s). ' , count ( $ connection -> channels ) - 1 );
}
})-> on ( ' request.before.send ' , function ( $ request , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is about to send a request! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ request instanceof AMQPMessage) {
$ request -> set ( ' content_type ' , ' application/json ' )
printf ( ' The request content_type header has been set to: %s ' , $ request -> get ( ' content_type ' ));
}
});
// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$ roundtrip = $ rpcClient -> ping ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // Advanced RPC Server Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC server from the client.
/** @var MAKSAmqpAgentRPCServerEndpoint */
$ rpcServer = $ client -> getServerEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$ rpcServer
-> on ( ' request.on.get ' , function ( $ request , $ rpcServer , $ eventName ) {
printf ( ' %s has emitted [%s] event and has just got a request! ' , get_class ( $ rpcServer ), $ eventName );
if ( $ request instanceof AMQPMessage) {
printf ( ' The request has the following body: %s ' , $ request -> body ;
}
});
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect ();Fakta: Anda dapat membuat kode di penerbit/konsumen contoh canggih jauh lebih mudah jika Anda membuat semua perubahan parameter dalam file konfigurasi dan meneruskannya ke klien alih -alih default.
Saran: AMQP Agent-base didokumentasikan dengan baik, silakan merujuk ke tautan ini untuk melihat semua kelas dan metode.
AMQP Agent adalah paket bersumber terbuka yang dilisensikan di bawah GNU LGPL v2.1 karena lisensi PHP-AMQPLIB.
Hak Cipta (C) 2020 Marwan al-Soltany. Semua hak dilindungi undang -undang.