Una envoltura elegante alrededor del famoso PHP-AMQPLIB para un caso de uso del 90%.
Instalación
Acerca del agente de AMQP
API
Documentación
Configuración
Ejemplos
Campo de golf
Licencia
Colegio de cambios
Prueba el agente de AMQP ahora:
composer require marwanalsoltany/amqp-agent Copie esta configuración en su composer.json :
"repositories" : {
"amqp-agent-repo" : {
"type" : " vcs " ,
"url" : " https://github.com/MarwanAlsoltany/amqp-agent.git "
}
},
"require" : {
"marwanalsoltany/amqp-agent" : " dev-dev "
},
"minimum-stability" : " dev "Correr:
composer update Nota: El agente AMQP admite ahora PHP 7.1 de forma predeterminada a partir de la versión v1.1.1, si usó la rama php7.1-compatibility en versiones anteriores, ¡actualice su compositor.json!
El agente de AMQP intenta simplificar la implementación de un corredor de mensajes en un proyecto PHP. Quita toda la sobrecarga de construir y configurar objetos o crear clases que necesitaría para hablar con el servidor RabbitMQ (a través de PHP-AMQPLIB ) y expone una API probada, totalmente configurable y flexible que se ajusta a casi cualquier proyecto.
La biblioteca PHP-AMQPLIB es increíble y funciona muy bien. El único problema es que es bastante desnudo usarse en un proyecto, sin rehacer sus propias clases de envoltura, es casi imposible no escribir código de espagueti. Además, la enorme cantidad de funciones, métodos y configuraciones (parámetros) que vienen con ella hace que sea realmente difícil implementar una API razonable para ser utilizada. El agente de AMQP resuelve este problema al hacer la mayor abstracción posible sin perder el control sobre los trabajadores y recuperar la terminología asociada con los corredores de mensajes, un editor y un consumidor es todo lo que necesita tratar si es un recién llegado.
Según este lema, el agente de AMQP hace que trabajar con RabbitMQ sea lo más divertido y elegante posible al exponer algunas interfaces fluidas que se implementan inteligentemente, se ajustan al desarrollo de PHP moderno, agradable de trabajar y muy simple de usar; Sin embargo, muy poderoso y puede sobrescribir las peculiaridades más pequeñas en cualquier momento de trabajar con el trabajador. ¡Con el agente AMQP puede comenzar a publicar y consumir mensajes con solo unas pocas líneas de código!
El agente de AMQP no sobrescribe nada de PHP-AMQPLIB ni cambia la terminología asociada con sus funciones. Solo lo simplifica; Saca el ruido de los nombres de las funciones y lo extiende en algunos lugares. También agrega algunas características agradables como los comandos de trabajadores, los métodos dinámicos del canal y la facilitación.
AMQP Agent también ofrece un poderoso cliente RPC basado en eventos y servidor RPC para sus proyectos IoT.
Trabajar con AMQP Agent puede ser tan fácil como:
// Publisher
$ publisher = new Publisher ();
$ publisher -> work ( $ messages );
// Consumer
$ consumer = new Consumer ();
$ consumer -> work ( $ callback );
// RPC Client
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( $ request );
$ rpcClient -> disconnect ();
// RPC Server
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( $ callback );
$ rpcServer -> disconnect ();El agente AMQP expone una serie de clases de concreto que se pueden usar directamente y otras clases abstractas que se pueden extender. Estos dos variantes de clase también tienen una subdivisión auxiliar.
| Clase | Descripción | API |
|---|---|---|
Abstractworker *A | Una clase abstracta que implementa la funcionalidad básica de un trabajador. | Doc |
Editor *C*S | Una clase especializada en publicación. Implementación solo de los métodos necesarios para un editor. | Doc |
Consumidor *C*S | Una clase especializada en consumo. Implementando solo los métodos necesarios para un consumidor. | Doc |
AbstractEndpoint *A | Una clase abstracta que implementa la funcionalidad básica de un punto final. | Doc |
Punto de cliente *C | Una clase especializada en solicitar. Implementación solo de los métodos necesarios para un cliente. | Doc |
ServerEndpoint *C | Una clase especializada en responder. Implementación solo de los métodos necesarios para un servidor. | Doc |
AmqPagentParameters *C*H | Una clase que contiene todos los parámetros del agente AMQP como constantes. | Doc |
Utilidad *C*H | Una clase que contiene funciones de ayudantes diversos. | Doc |
Evento *C*H | Una clase simple para manejar eventos (despacho y escucha). | Doc |
ArrayProxy *C*H | Una clase que contiene métodos para manipular y trabajar en matrices. | Doc |
Classproxy *C*H | Una clase que contiene métodos para llamadas de métodos proxy, manipulación de propiedades y utilidades de clase. | Doc |
IDGENERATOR *C*H | Una clase que contiene funciones para generar identificaciones y tokens únicos | Doc |
Serializador *C*H | Un serializador flexible que se utilizará junto con los trabajadores. | Doc |
Logger *C*H | Una clase para escribir registros, exponiendo métodos que funcionan estáticamente y en instanciación. | Doc |
Singleton *A*H | Una clase abstracta que implementa la funcionalidad fundamental de un singleton. | Doc |
Config *C*R | Una clase que convierte el archivo de configuración en un objeto. | Doc |
Cliente *C*R | Una clase devuelve todo lo que el agente de AMQP tiene para ofrecer. Un contenedor de servicio simple, por así decirlo. | Doc |
Ejemplo *A*H | Una clase abstracta utilizada como una devolución de llamada predeterminada para el consumidor. | Doc |
Consulte también: AbstractWorkersingLeton, Publishersingleton, ConsumersingLeton, AbstractWorkerInterface, PublisherInterface, ConsumerInterface, WorkerFacilitationInterface, WorkermutationTrait, WorkerComandTrait, AbstractPointInterface, ClientendPointInterface, ServerendPointInterface, EventTrait, ArraxyTrait, ClassProxyTrait, AbstractParameters.
*C Concreto: esta clase es una clase concreta y puede instanciarse directamente.*A resumen: Esta clase es una clase abstracta y no puede instanciarse directamente.*H Helper: Esta clase es una clase de ayuda. En su lugar, las alternativas de terceros se pueden usar libremente.*R Recomendado: se recomienda utilizar esta clase cuando se trabaje con AMQP Agent (Mejor práctica).*S Singleton: esta clase tiene una versión de Singleton disponible mediante sufijo el nombre de la clase con Singleton y se puede recuperar a través de *Singleton::getInstance() , es decir, Publisher -> PublisherSingleton .Nota: Singleton se considera un antipatrón, intente evitarlo tanto como sea posible, aunque hay casos de uso para ello. Use singletons solo si sabe lo que está haciendo.
Si simplemente desea publicar y consumir mensajes, todo está listo y configurado ya, AMQP Agent se envía con una configuración probada que sigue las mejores prácticas. Simplemente puede importar clase Publisher y/o clase Consumer en su archivo y sobrescribir los parámetros que desea (credenciales de RabbitMQ, por ejemplo) más adelante en la instancia.
Si desea ajustar y ajustar la configuración del agente AMQP a sus necesidades exactas, hay un poco de trabajo por hacer. Debe suministrar un archivo de configuración (consulte: maks-amqp-agent-config.php y prestar atención a los comentarios). No tiene que suministrar todo, simplemente puede escribir los parámetros que desea sobrescribir, el agente de AMQP es lo suficientemente inteligente como para agregar la deficiencia. Estos parámetros también se pueden sobrescribir más adelante a través de la notación de asignación pública o por llamada de método.
Hecho: AMQP Agent utiliza los mismos nombres de parámetros que PHP-AMQPLIB en el archivo de configuración y en la matriz de parámetros pasada en la llamada del método.
<?php return [
// Global
' connectionOptions ' => [
' host ' => ' your-rabbitmq-server.com ' ,
' port ' => 5672 ,
' user ' => ' your-username ' ,
' password ' => ' your-password ' ,
' vhost ' => ' / '
],
' queueOptions ' => [
' queue ' => ' your.queue.name ' ,
' durable ' => true ,
' nowait ' => false
],
// Publisher
' exchangeOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' type ' => ' direct '
],
' bindOptions ' => [
' queue ' => ' your.queue.name ' ,
' exchange ' => ' your.exchange.name '
],
' messageOptions ' => [
' properties ' => [
' content_type ' => ' application/json ' ,
' content_encoding ' => ' UTF-8 ' ,
' delivery_mode ' => 2
]
],
' publishOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' routing_key ' => ' your.route.name '
],
// Consumer
' qosOptions ' => [
' prefetch_count ' => 25
],
' waitOptions ' => [
' timeout ' => 3600
],
' consumeOptions ' => [
' queue ' => ' your.queue.name ' ,
' consumer_tag ' => ' your.consumer.name ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback '
]
// RPC Endpoints
'rpcQueueName' => 'your.rpc.queue.name'
]; Nota: Los nombres de claves de primer nivel de matriz (sufijo con Options ) son específicos para el agente AMQP.
Antes de comenzar con ejemplos, tenemos que aclarar algunas cosas. Vale la pena mencionar desde el principio que con el agente de AMQP hay múltiples formas de recuperar a un trabajador, existe la forma simple, la forma recomendada y las formas más avanzadas. Después de recuperar a un trabajador, es como arcilla, puede formarlo de la manera que desee. Este diseño modular acomoda con gracia sus necesidades, conduce a una base de código escalable y simplemente hace felices a todos.
new palabra clave. Esta forma requiere pasar parámetros a través del constructor, llamadas de método o asignación de propiedades públicas.PublisherSingleton::getInstance() . Esta forma requiere pasar los parámetros a través del método getInstance() , las llamadas de método o la asignación de propiedades públicas.Client . De esta manera, también hace que el código sea más legible ya que los parámetros se recuperan de la configuración pasada. // Instantiating Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Worker PublisherSingleton ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Worker ConsumerSingleton ;
use MAKS AmqpAgent RPC ClientEndpoint ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ publisher1 = new Publisher ( /* parameters can be passed here */ );
$ publisher2 = PublisherSingleton:: getInstance ( /* parameters can be passed here */ );
$ consumer1 = new Consumer ( /* parameters can be passed here */ );
$ consumer2 = ConsumerSingleton:: getInstance ( /* parameters can be passed here */ );
$ rpcClientA = new ClientEndpoint ( /* parameters can be passed here */ );
$ rpcServerA = new ServerEndpoint ( /* parameters can be passed here */ );
// the parameters from this Config object will be passed to the workers.
$ config = new Config ( ' path/to/your/config-file.php ' );
$ client = new Client ( $ config ); // path can also be passed directly to Client
$ publisher3 = $ client -> getPublisher (); // or $client->get('publisher');
$ consumer3 = $ client -> getConsumer (); // or $client->get('consumer');
$ rpcClientB = $ client -> getClientEndpoint (); // or $client->get('client.endpoint');
$ rpcServerB = $ client -> getServerEndpoint (); // or $client->get('server.endpoint');
// Use $client->gettable() to get an array of all available services. // Publisher Demo 1
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// exchangeOptions
' exchange ' => ' test.messages.exchange '
],
[
// bindOptions
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
],
[
// messageOptions
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ publisher -> connect ()-> queue ()-> exchange ()-> bind ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (2)
$ publisher -> prepare ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (3)
$ publisher -> work ( $ messages ); // Publisher Demo 2
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ publisher -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ publisher -> connect ();
$ publisher -> queue ([
' queue ' => ' test.messages.queue '
]);
$ publisher -> exchange ([
' exchange ' => ' test.messages.exchange '
]);
$ publisher -> bind ([
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
]);
foreach ( $ messages as $ message ) {
$ publisher -> publish (
[
' body ' => $ message ,
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
' exchange ' => ' test.messages.exchange '
]
);
}
$ publisher -> disconnect (); // Consumer Demo 1
$ consumer = new Consumer (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// qosOptions
' exchange ' => ' test.messages.exchange '
],
[
// waitOptions
],
[
// consumeOptions
' queue ' => ' test.messages.queue ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback ' ,
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ consumer -> connect ();
$ consumer -> queue ();
$ consumer -> qos ();
$ consumer -> consume ();
$ consumer -> wait ();
$ consumer -> disconnect ();
// Variant I (2)
$ consumer -> prepare ()-> consume ()-> wait ()-> disconnect ();
// Variant I (3)
$ consumer -> work ( ' YourNamespaceYourClass::yourCallback ' ); // Consumer Demo 2
$ variable = ' This variable is needed in your callback. It will be the second, the first is always the message! ' ;
$ consumer = new Consumer ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ consumer -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ consumer -> connect ();
$ consumer -> queue ([
' queue ' => ' test.messages.queue '
]);
$ consumer -> qos ([
' prefetch_count ' => 10
]);
$ consumer -> consume (
[
' YourNamespaceYourClass ' ,
' yourCallback '
],
[
$ variable
],
[
' queue ' => ' test.messages.queue '
]
);
$ consumer -> wait ();
$ consumer -> disconnect (); // RPC Client Demo 1
$ rpcClient = new ClientEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // RPC Client Demo 2
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ response = $ rpcClient -> request (
' {"command":"some-command","parameter":"some-parameter"} ' ,
' your.rpc.queue.name '
);
$ rpcClient -> disconnect (); // RPC Server Demo 1
$ rpcServer = new ServerEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect (); // RPC Server Demo 2
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ request = $ rpcServer -> respond (
' YourNamespaceYourClass::yourCallback ' ,
' your.rpc.queue.name '
);
$ rpcServer -> disconnect ();HECHO: Al suministrar parámetros, solo proporcionan los parámetros que necesita. El agente de AMQP es lo suficientemente inteligente como para agregar la deficiencia.
Consejo: puede simplificar los constructores pesados escritos en los ejemplos anteriores si usa get($className) en una instancia de la clase Client después de proporcionar un archivo de configuración con los parámetros que desea.
Nota: Consulte los documentos del agente AMQP para obtener la explicación completa de los métodos. Consulte la documentación de RabbitMQ y PHP-AMQPLIB para obtener la explicación completa de los parámetros.
En estos ejemplos, verá cómo trabajaría con el agente AMQP en un escenario del mundo real.
// Advanced Publisher Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Helper Serializer ;
// Preparing some data to work with.
$ data = [];
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ data [] = [
' id ' => $ i ,
' importance ' => $ i % 3 == 0 ? ' high ' : ' low ' , // Tag 1/3 of the messages with high importance.
' text ' => ' Test message with ID ' . $ i
];
}
// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$ config = new Config ();
// Instantiating a client.
$ client = new Client ( $ config );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a publisher from the client.
/** @var MAKSAmqpAgentWorkerPublisher */
$ publisher = $ client -> get ( ' publisher ' );
// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$ publisher -> connect ();
// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$ publisher -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ publisher -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Declaring a direct exchange to publish messages to.
$ publisher -> exchange ([
' exchange ' => ' high.and.low.importance.exchange ' ,
' type ' => ' direct '
]);
// Binding the queue with the exchange.
$ publisher -> bind ([
' queue ' => ' high.and.low.importance.queue ' ,
' exchange ' => ' high.and.low.importance.exchange '
]);
// Publishing messages according to their priority.
foreach ( $ data as $ item ) {
$ payload = $ serializer -> serialize ( $ item , ' JSON ' );
if ( $ item [ ' importance ' ] == ' high ' ) {
$ publisher -> publish (
[
' body ' => $ payload ,
' properties ' => [
' priority ' => 2
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
continue ;
}
$ publisher -> publish (
$ payload , // Not providing priority will fall back to 0
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
}
// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' start ' , ' consumer ' ),
' JSON '
),
' properties ' => [
' priority ' => 1
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$ iterator = 2 ;
do {
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' close ' , ' channel ' ),
' JSON '
),
' properties ' => [
' priority ' => 0
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
$ iterator --;
} while ( $ iterator != 0 );
// Close the connection with RabbitMQ server.
$ publisher -> disconnect (); // Advanced Consumer Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Helper Serializer ;
use MAKS AmqpAgent Helper Logger ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var MAKSAmqpAgentHelperLogger */
$ logger = $ client -> get ( ' logger ' );
$ logger -> setDirectory ( __DIR__ );
$ logger -> setFilename ( ' high-and-low-importance-messages ' );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a consumer from the client.
/** @var MAKSAmqpAgentWorkerConsumer */
$ consumer = $ client -> get ( ' consumer ' );
$ consumer -> connect ();
// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$ consumer -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Overwriting the default quality of service.
$ consumer -> qos ([
' prefetch_count ' => 1 ,
]);
// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$ callback = function ( $ message , & $ client , $ callback ) {
$ data = $ client -> getSerializer ()-> unserialize ( $ message -> body , ' JSON ' );
if (Consumer:: isCommand ( $ data )) {
Consumer:: ack ( $ message );
if (Consumer:: hasCommand ( $ data , ' close ' , ' channel ' )) {
// Giving time for acknowledgements to take effect,
// because the channel will be closed shortly
sleep ( 5 );
// Close the channel using the delivery info of the message.
Consumer:: shutdown ( $ message );
} elseif (Consumer:: hasCommand ( $ data , ' start ' , ' consumer ' )) {
$ consumer = $ client -> getConsumer ();
// Getting a new channel on the same connection.
$ channel = $ consumer -> getNewChannel ();
$ consumer -> queue (
[
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
],
$ channel
);
$ consumer -> qos (
[
' prefetch_count ' => 1 ,
],
$ channel
);
$ consumer -> consume (
$ callback ,
[
& $ client ,
$ callback
],
[
' queue ' => ' high.and.low.importance.queue ' ,
' consumer_tag ' => ' callback.consumer- ' . uniqid ()
],
$ channel
);
}
return ;
}
$ client -> getLogger ()-> write ( " ( { $ data [ ' importance ' ]} ) - { $ data [ ' text ' ]}" );
// Sleep for 50ms to mimic some processing.
usleep ( 50000 );
// The final step is acknowledgment so that no data is lost.
Consumer:: ack ( $ message );
};
$ consumer -> consume (
$ callback ,
[
& $ client , // Is used to refetch the consumer, serializer, and logger.
$ callback // This gets passed to the consumer that get started by the callback.
],
[
' queue ' => ' high.and.low.importance.queue '
]
);
// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$ consumer -> waitForAll ();
// Close the connection with RabbitMQ server.
$ consumer -> disconnect (); // Advanced RPC Client Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ClientEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC client endpoint from the client.
/** @var MAKSAmqpAgentRPCClientEndpoint */
$ rpcClient = $ client -> getClientEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$ rpcClient
-> on ( ' connection.after.open ' , function ( $ connection , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is now connected! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ connection instanceof AMQPStreamConnection) {
printf ( ' The connection has currently %d channel(s). ' , count ( $ connection -> channels ) - 1 );
}
})-> on ( ' request.before.send ' , function ( $ request , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is about to send a request! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ request instanceof AMQPMessage) {
$ request -> set ( ' content_type ' , ' application/json ' )
printf ( ' The request content_type header has been set to: %s ' , $ request -> get ( ' content_type ' ));
}
});
// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$ roundtrip = $ rpcClient -> ping ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // Advanced RPC Server Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC server from the client.
/** @var MAKSAmqpAgentRPCServerEndpoint */
$ rpcServer = $ client -> getServerEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$ rpcServer
-> on ( ' request.on.get ' , function ( $ request , $ rpcServer , $ eventName ) {
printf ( ' %s has emitted [%s] event and has just got a request! ' , get_class ( $ rpcServer ), $ eventName );
if ( $ request instanceof AMQPMessage) {
printf ( ' The request has the following body: %s ' , $ request -> body ;
}
});
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect ();Hecho: puede hacer que el código en los ejemplos avanzados del editor/consumidor sea mucho más más fácil si realiza todos los cambios de parámetros en un archivo de configuración y lo pasa al cliente en lugar del valor predeterminado.
Consejo: la base de código de agente de AMQP está bien documentada, consulte este enlace para ver todas las clases y métodos.
AMQP Agent es un paquete de código abierto con licencia bajo la GNU LGPL V2.1 debido a la licencia PHP-AMQPLIB.
Copyright (c) 2020 Marwan al-Soltany. Reservados todos los derechos.