Un emballage élégant autour du célèbre PHP-AMQPLIB pour un cas d'utilisation à 90%.
Installation
À propos de l'agent AMQP
API
Documentation
Configuration
Exemples
Links
Licence
Changelog
Essayez l'agent AMQP maintenant:
composer require marwanalsoltany/amqp-agent Copiez cette configuration dans votre composer.json :
"repositories" : {
"amqp-agent-repo" : {
"type" : " vcs " ,
"url" : " https://github.com/MarwanAlsoltany/amqp-agent.git "
}
},
"require" : {
"marwanalsoltany/amqp-agent" : " dev-dev "
},
"minimum-stability" : " dev "Courir:
composer update Remarque: L'agent AMQP prend en charge maintenant PHP 7.1 par défaut à partir de la version V1.1.1, si vous avez utilisé la branche php7.1-compatibility dans les anciennes versions, mettez à jour votre composer.json!
L'agent AMQP essaie de simplifier la mise en œuvre d'un courtier de message dans un projet PHP. Il enlève toute la surcharge de la construction et de la configuration d'objets ou de la création de classes dont vous auriez besoin pour parler avec RabbitMQ Server (via PHP-AMQPLIB ) et expose une API testée, entièrement configurable et flexible qui correspond à presque tous les projets.
La bibliothèque PHP-AMQPLIB est géniale et fonctionne très bien. Le seul et unique problème est qu'il est assez nu d'être utilisé dans un projet, sans refaire vos propres cours de wrapper, il est presque impossible de ne pas écrire de code spaghetti. De plus, l'énorme quantité de fonctions, de méthodes et de configurations (paramètres) qui l'accompagnent rendent très difficile la mise en œuvre d'une API raisonnable à utiliser. L'agent AMQP résout ce problème en faisant autant d'abstraction que possible sans perdre le contrôle des travailleurs et en ramenant la terminologie associée aux courtiers de message, un éditeur et un consommateur est tout ce que vous devez traiter si vous êtes un nouveau venu.
Selon cette devise, l'agent AMQP rend le travail avec RabbitMQ aussi amusant et élégant que possible en exposant des interfaces courantes qui sont intelligemment mises en œuvre, en forme de développement de PHP moderne, agréable à travailler et très simple à utiliser; Pourtant, très puissant et peut écraser les plus petites bizarreries à tout moment de travailler avec le travailleur. Avec AMQP Agent, vous pouvez commencer à publier et à consommer des messages avec seulement quelques lignes de code!
L'agent AMQP ne remplace rien de PHP-AMQPLIB ni ne modifie la terminologie associée à ses fonctions. Cela ne fait que le simplifier; supprime le bruit des noms des fonctions et l'étend à certains endroits. Il ajoute également de belles fonctionnalités telles que les commands de travailleurs, l'attente dynamique des canaux et les méthodes de facilitation.
L'agent AMQP propose également un puissant client RPC basé sur des événements et un serveur RPC pour vos projets IoT.
Travailler avec l'agent AMQP peut être aussi simple que:
// Publisher
$ publisher = new Publisher ();
$ publisher -> work ( $ messages );
// Consumer
$ consumer = new Consumer ();
$ consumer -> work ( $ callback );
// RPC Client
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( $ request );
$ rpcClient -> disconnect ();
// RPC Server
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( $ callback );
$ rpcServer -> disconnect ();L'agent AMQP expose un certain nombre de classes concrètes qui peuvent être directement utilisées et d'autres classes abstraites qui peuvent être étendues. Ces deux varations de classe ont également une sous-division d'assistance.
| Classe | Description | API |
|---|---|---|
Abstraitworker *A | Une classe abstraite implémentant la fonctionnalité de base d'un travailleur. | Doc |
Éditeur *C*S | Une classe spécialisée dans l'édition. Implémentation uniquement des méthodes nécessaires pour un éditeur. | Doc |
Consommateur *C*S | Une classe spécialisée dans la consommation. Implémentation uniquement des méthodes nécessaires pour un consommateur. | Doc |
RésumétendPoint *A | Une classe abstraite implémentant les fonctionnalités de base d'un point final. | Doc |
Clienttendpoint *C | Une classe spécialisée dans la demande. Implémentation uniquement des méthodes nécessaires pour un client. | Doc |
Severendpoint *C | Une classe spécialisée dans la réponse. Implémentation uniquement des méthodes nécessaires pour un serveur. | Doc |
AmqpagentParameters *C*H | Une classe qui contient tous les paramètres d'agent AMQP sous forme de constantes. | Doc |
Utilité *C*H | Une classe contenant des fonctions d'assistance diverses. | Doc |
Événement *C*H | Une classe simple pour gérer les événements (répartition et écoute). | Doc |
Arrayproxy *C*H | Une classe contenant des méthodes pour la manipulation et les tableaux de travail. | Doc |
ClassProxy *C*H | Une classe contenant des méthodes pour les méthodes proxy appelant, manipulation des propriétés et utilitaires de classe. | Doc |
IdGenerator *C*H | Une classe contenant des fonctions pour générer des ID et des jetons uniques | Doc |
Sérialiseur *C*H | Un sérialiseur flexible à utiliser en conjonction avec les travailleurs. | Doc |
Enregistreur *C*H | Une classe pour écrire des journaux, exposant des méthodes qui fonctionnent statiquement et sur l'instanciation. | Doc |
Singleton *A*H | Une classe abstraite mettant en œuvre la fonctionnalité fondamentale d'un singleton. | Doc |
Config *C*R | Une classe qui transforme le fichier de configuration en un objet. | Doc |
Client *C*R | Une classe renvoie tout ce que l'agent AMQP a à offrir. Un conteneur de service simple pour dire. | Doc |
Exemple *A*H | Une classe abstraite utilisée comme rappel par défaut pour le consommateur. | Doc |
Voir aussi: AbstractWorkersingleton, Publishersingleton, Consumersingleton, AbstractWorkerInterface, PublisherInterface, ConsumerInterface, WorkerFacilitationInterface, WorkerMutationTrait, WorkerComandtrait, AbstrAndPointInterface, ClienttendPointpoint, SeverEndPointInterface, Eventtraitrs.
*C Concrete: cette classe est une classe de béton et peut être instanciée directement.*A résumé: cette classe est une classe abstraite et ne peut pas être instanciée directement.*H Helper: cette classe est une classe d'assistance. Les alternatives tierces peuvent être utilisées librement à la place.*R recommandé: cette classe est recommandée pour être utilisée lorsque vous travaillez avec AMQP Agent (meilleure pratique).*S Singleton: Ce cours a une version singleton disponible via le suffixe du nom de classe avec Singleton et peut être récupéré via *Singleton::getInstance() , IE Publisher -> PublisherSingleton .Remarque: Singleton est considéré comme un anti-motif, essayez de l'éviter autant que possible, bien qu'il y ait des cas d'utilisation pour cela. Utilisez des singletons uniquement si vous savez ce que vous faites.
Si vous souhaitez simplement publier et consommer des messages, tout est déjà prêt et configuré, l'agent AMQP est expédié avec une configuration testée qui suit les meilleures pratiques. Vous pouvez simplement importer la classe Publisher et / ou la classe Consumer dans votre fichier et écraser les paramètres que vous souhaitez (RabbitMQ Identials par exemple) plus tard sur l'instance.
Si vous souhaitez affiner et modifier la configuration de l'agent AMQP selon vos besoins exacts, il y a un peu de travail à faire. Vous devez fournir un fichier de configuration (voir: maks-amqp-agent-config.php et faire attention aux commentaires). Vous n'avez pas à tout fournir, vous pouvez simplement écrire les paramètres que vous souhaitez écraser, l'agent AMQP est assez intelligent pour ajouter la carence. Ces paramètres peuvent également être remplacés plus tard par la notation d'attribution publique ou l'appel par méthode.
FAIT: L'agent AMQP utilise les mêmes noms de paramètres que PHP-AMQPLIB dans le fichier de configuration et dans le tableau de paramètres transmis sur l'appel de méthode.
<?php return [
// Global
' connectionOptions ' => [
' host ' => ' your-rabbitmq-server.com ' ,
' port ' => 5672 ,
' user ' => ' your-username ' ,
' password ' => ' your-password ' ,
' vhost ' => ' / '
],
' queueOptions ' => [
' queue ' => ' your.queue.name ' ,
' durable ' => true ,
' nowait ' => false
],
// Publisher
' exchangeOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' type ' => ' direct '
],
' bindOptions ' => [
' queue ' => ' your.queue.name ' ,
' exchange ' => ' your.exchange.name '
],
' messageOptions ' => [
' properties ' => [
' content_type ' => ' application/json ' ,
' content_encoding ' => ' UTF-8 ' ,
' delivery_mode ' => 2
]
],
' publishOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' routing_key ' => ' your.route.name '
],
// Consumer
' qosOptions ' => [
' prefetch_count ' => 25
],
' waitOptions ' => [
' timeout ' => 3600
],
' consumeOptions ' => [
' queue ' => ' your.queue.name ' ,
' consumer_tag ' => ' your.consumer.name ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback '
]
// RPC Endpoints
'rpcQueueName' => 'your.rpc.queue.name'
]; Remarque: les noms de touches de premier niveau du tableau (suffixés avec Options ) sont spécifiques à l'agent AMQP.
Avant de commencer par des exemples, nous devons clarifier certaines choses. Il convient de mentionner depuis le début qu'avec l'agent AMQP, il existe plusieurs façons de récupérer un travailleur, il y a le moyen simple, la manière recommandée et les façons les plus avancées. Après avoir récupéré un travailleur, c'est comme de l'argile, vous pouvez le former comme vous le souhaitez. Cette conception modulaire répond gracieusement à vos besoins, se déroule dans une base de code évolutive et rend tout simplement tout le monde.
new mot-clé. De cette façon, il faut passer des paramètres via le constructeur, les appels de méthode ou l'attribution de propriété publique.PublisherSingleton::getInstance() . De cette façon, il faut passer des paramètres via la méthode getInstance() , les appels de méthode ou l'attribution de propriété publique.Client . De cette façon, le code est également plus lisible car les paramètres sont récupérés à partir de la configuration passée. // Instantiating Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Worker PublisherSingleton ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Worker ConsumerSingleton ;
use MAKS AmqpAgent RPC ClientEndpoint ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ publisher1 = new Publisher ( /* parameters can be passed here */ );
$ publisher2 = PublisherSingleton:: getInstance ( /* parameters can be passed here */ );
$ consumer1 = new Consumer ( /* parameters can be passed here */ );
$ consumer2 = ConsumerSingleton:: getInstance ( /* parameters can be passed here */ );
$ rpcClientA = new ClientEndpoint ( /* parameters can be passed here */ );
$ rpcServerA = new ServerEndpoint ( /* parameters can be passed here */ );
// the parameters from this Config object will be passed to the workers.
$ config = new Config ( ' path/to/your/config-file.php ' );
$ client = new Client ( $ config ); // path can also be passed directly to Client
$ publisher3 = $ client -> getPublisher (); // or $client->get('publisher');
$ consumer3 = $ client -> getConsumer (); // or $client->get('consumer');
$ rpcClientB = $ client -> getClientEndpoint (); // or $client->get('client.endpoint');
$ rpcServerB = $ client -> getServerEndpoint (); // or $client->get('server.endpoint');
// Use $client->gettable() to get an array of all available services. // Publisher Demo 1
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// exchangeOptions
' exchange ' => ' test.messages.exchange '
],
[
// bindOptions
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
],
[
// messageOptions
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ publisher -> connect ()-> queue ()-> exchange ()-> bind ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (2)
$ publisher -> prepare ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (3)
$ publisher -> work ( $ messages ); // Publisher Demo 2
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ publisher -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ publisher -> connect ();
$ publisher -> queue ([
' queue ' => ' test.messages.queue '
]);
$ publisher -> exchange ([
' exchange ' => ' test.messages.exchange '
]);
$ publisher -> bind ([
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
]);
foreach ( $ messages as $ message ) {
$ publisher -> publish (
[
' body ' => $ message ,
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
' exchange ' => ' test.messages.exchange '
]
);
}
$ publisher -> disconnect (); // Consumer Demo 1
$ consumer = new Consumer (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// qosOptions
' exchange ' => ' test.messages.exchange '
],
[
// waitOptions
],
[
// consumeOptions
' queue ' => ' test.messages.queue ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback ' ,
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ consumer -> connect ();
$ consumer -> queue ();
$ consumer -> qos ();
$ consumer -> consume ();
$ consumer -> wait ();
$ consumer -> disconnect ();
// Variant I (2)
$ consumer -> prepare ()-> consume ()-> wait ()-> disconnect ();
// Variant I (3)
$ consumer -> work ( ' YourNamespaceYourClass::yourCallback ' ); // Consumer Demo 2
$ variable = ' This variable is needed in your callback. It will be the second, the first is always the message! ' ;
$ consumer = new Consumer ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ consumer -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ consumer -> connect ();
$ consumer -> queue ([
' queue ' => ' test.messages.queue '
]);
$ consumer -> qos ([
' prefetch_count ' => 10
]);
$ consumer -> consume (
[
' YourNamespaceYourClass ' ,
' yourCallback '
],
[
$ variable
],
[
' queue ' => ' test.messages.queue '
]
);
$ consumer -> wait ();
$ consumer -> disconnect (); // RPC Client Demo 1
$ rpcClient = new ClientEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // RPC Client Demo 2
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ response = $ rpcClient -> request (
' {"command":"some-command","parameter":"some-parameter"} ' ,
' your.rpc.queue.name '
);
$ rpcClient -> disconnect (); // RPC Server Demo 1
$ rpcServer = new ServerEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect (); // RPC Server Demo 2
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ request = $ rpcServer -> respond (
' YourNamespaceYourClass::yourCallback ' ,
' your.rpc.queue.name '
);
$ rpcServer -> disconnect ();FAIT: Lorsque la fourniture de paramètres ne fournit que les paramètres dont vous avez besoin. L'agent AMQP est assez intelligent pour ajouter la carence.
Conseil: vous pouvez simplifier les constructeurs lourds écrits dans les exemples ci-dessus si vous utilisez get($className) sur une instance de la classe Client après avoir fourni un fichier de configuration avec les paramètres que vous souhaitez.
Remarque: reportez-vous aux documents d'agent AMQP pour l'explication complète des méthodes. Reportez-vous à la documentation RabbitMQ et PHP-AMQPLIB pour l'explication complète des paramètres.
Dans ces exemples, vous verrez comment vous travailleriez avec l'agent AMQP dans un scénario du monde réel.
// Advanced Publisher Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Helper Serializer ;
// Preparing some data to work with.
$ data = [];
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ data [] = [
' id ' => $ i ,
' importance ' => $ i % 3 == 0 ? ' high ' : ' low ' , // Tag 1/3 of the messages with high importance.
' text ' => ' Test message with ID ' . $ i
];
}
// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$ config = new Config ();
// Instantiating a client.
$ client = new Client ( $ config );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a publisher from the client.
/** @var MAKSAmqpAgentWorkerPublisher */
$ publisher = $ client -> get ( ' publisher ' );
// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$ publisher -> connect ();
// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$ publisher -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ publisher -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Declaring a direct exchange to publish messages to.
$ publisher -> exchange ([
' exchange ' => ' high.and.low.importance.exchange ' ,
' type ' => ' direct '
]);
// Binding the queue with the exchange.
$ publisher -> bind ([
' queue ' => ' high.and.low.importance.queue ' ,
' exchange ' => ' high.and.low.importance.exchange '
]);
// Publishing messages according to their priority.
foreach ( $ data as $ item ) {
$ payload = $ serializer -> serialize ( $ item , ' JSON ' );
if ( $ item [ ' importance ' ] == ' high ' ) {
$ publisher -> publish (
[
' body ' => $ payload ,
' properties ' => [
' priority ' => 2
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
continue ;
}
$ publisher -> publish (
$ payload , // Not providing priority will fall back to 0
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
}
// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' start ' , ' consumer ' ),
' JSON '
),
' properties ' => [
' priority ' => 1
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$ iterator = 2 ;
do {
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' close ' , ' channel ' ),
' JSON '
),
' properties ' => [
' priority ' => 0
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
$ iterator --;
} while ( $ iterator != 0 );
// Close the connection with RabbitMQ server.
$ publisher -> disconnect (); // Advanced Consumer Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Helper Serializer ;
use MAKS AmqpAgent Helper Logger ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var MAKSAmqpAgentHelperLogger */
$ logger = $ client -> get ( ' logger ' );
$ logger -> setDirectory ( __DIR__ );
$ logger -> setFilename ( ' high-and-low-importance-messages ' );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a consumer from the client.
/** @var MAKSAmqpAgentWorkerConsumer */
$ consumer = $ client -> get ( ' consumer ' );
$ consumer -> connect ();
// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$ consumer -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Overwriting the default quality of service.
$ consumer -> qos ([
' prefetch_count ' => 1 ,
]);
// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$ callback = function ( $ message , & $ client , $ callback ) {
$ data = $ client -> getSerializer ()-> unserialize ( $ message -> body , ' JSON ' );
if (Consumer:: isCommand ( $ data )) {
Consumer:: ack ( $ message );
if (Consumer:: hasCommand ( $ data , ' close ' , ' channel ' )) {
// Giving time for acknowledgements to take effect,
// because the channel will be closed shortly
sleep ( 5 );
// Close the channel using the delivery info of the message.
Consumer:: shutdown ( $ message );
} elseif (Consumer:: hasCommand ( $ data , ' start ' , ' consumer ' )) {
$ consumer = $ client -> getConsumer ();
// Getting a new channel on the same connection.
$ channel = $ consumer -> getNewChannel ();
$ consumer -> queue (
[
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
],
$ channel
);
$ consumer -> qos (
[
' prefetch_count ' => 1 ,
],
$ channel
);
$ consumer -> consume (
$ callback ,
[
& $ client ,
$ callback
],
[
' queue ' => ' high.and.low.importance.queue ' ,
' consumer_tag ' => ' callback.consumer- ' . uniqid ()
],
$ channel
);
}
return ;
}
$ client -> getLogger ()-> write ( " ( { $ data [ ' importance ' ]} ) - { $ data [ ' text ' ]}" );
// Sleep for 50ms to mimic some processing.
usleep ( 50000 );
// The final step is acknowledgment so that no data is lost.
Consumer:: ack ( $ message );
};
$ consumer -> consume (
$ callback ,
[
& $ client , // Is used to refetch the consumer, serializer, and logger.
$ callback // This gets passed to the consumer that get started by the callback.
],
[
' queue ' => ' high.and.low.importance.queue '
]
);
// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$ consumer -> waitForAll ();
// Close the connection with RabbitMQ server.
$ consumer -> disconnect (); // Advanced RPC Client Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ClientEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC client endpoint from the client.
/** @var MAKSAmqpAgentRPCClientEndpoint */
$ rpcClient = $ client -> getClientEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$ rpcClient
-> on ( ' connection.after.open ' , function ( $ connection , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is now connected! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ connection instanceof AMQPStreamConnection) {
printf ( ' The connection has currently %d channel(s). ' , count ( $ connection -> channels ) - 1 );
}
})-> on ( ' request.before.send ' , function ( $ request , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is about to send a request! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ request instanceof AMQPMessage) {
$ request -> set ( ' content_type ' , ' application/json ' )
printf ( ' The request content_type header has been set to: %s ' , $ request -> get ( ' content_type ' ));
}
});
// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$ roundtrip = $ rpcClient -> ping ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // Advanced RPC Server Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC server from the client.
/** @var MAKSAmqpAgentRPCServerEndpoint */
$ rpcServer = $ client -> getServerEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$ rpcServer
-> on ( ' request.on.get ' , function ( $ request , $ rpcServer , $ eventName ) {
printf ( ' %s has emitted [%s] event and has just got a request! ' , get_class ( $ rpcServer ), $ eventName );
if ( $ request instanceof AMQPMessage) {
printf ( ' The request has the following body: %s ' , $ request -> body ;
}
});
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect ();FAIT: Vous pouvez rendre le code dans Publisher / Consumer Advanced Exemples Beaucoup plus plus facilement si vous apportez tous les modifications de tous les paramètres dans un fichier de configuration et passez au client au lieu de la valeur par défaut.
Conseils: la base de code AMQP Agent est bien documentée, veuillez vous référer à ce lien pour jeter un œil sur toutes les classes et méthodes.
L'agent AMQP est un package open source autorisé dans le cadre de la licence GNU LGPL V2.1 en raison de la licence PHP-AMQPLIB.
Copyright (C) 2020 Marwan al-Soltany. Tous droits réservés.