Eine elegante Wrapper um das berühmte PHP-AMQPLIB für 90% Anwendungsfall.
Installation
Über AMQP Agent
API
Dokumentation
Konfiguration
Beispiele
Links
Lizenz
Changelog
Versuchen Sie es jetzt mit Amqp Agent:
composer require marwanalsoltany/amqp-agent Kopieren Sie diese Konfiguration in Ihrem composer.json :
"repositories" : {
"amqp-agent-repo" : {
"type" : " vcs " ,
"url" : " https://github.com/MarwanAlsoltany/amqp-agent.git "
}
},
"require" : {
"marwanalsoltany/amqp-agent" : " dev-dev "
},
"minimum-stability" : " dev "Laufen:
composer update Hinweis: AMQP Agent unterstützt jetzt standardmäßig PHP 7.1 ab Version V1.1.1, wenn Sie die php7.1-compatibility in älteren Versionen verwendet haben, aktualisieren Sie Ihren Composer.json!
AMQP-Agent versucht, die Implementierung eines Nachrichtenbrokers in einem PHP-Projekt zu vereinfachen. Es nimmt den gesamten Aufwand des Erstellens und Konfigurierens von Objekten oder Erstellen von Klassen weg, die Sie benötigen würden, um mit Rabbitmq Server (über PHP-AMQPLIB ) zu sprechen, und enthält eine getestete, vollständig konfigurierbare und flexible API, die in fast jedem Projekt passt.
Die PHP-AMQPLIB -Bibliothek ist fantastisch und funktioniert sehr gut. Das einzige Problem ist, dass es ziemlich nackt ist, in einem Projekt verwendet zu werden, ohne Ihre eigenen Wrapper-Klassen neu zu erstellen. Es ist fast unmöglich, Spaghetti-Code nicht zu schreiben. Außerdem ist die enorme Menge an Funktionen, Methoden und Konfigurationen (Parametern), die damit verbunden sind, es sehr schwierig, eine vernünftige API zu implementieren, um sie zu verwenden. AMQP Agent löst dieses Problem, indem er so viel Abstraktion wie möglich macht, ohne die Kontrolle über die Arbeitnehmer zu verlieren und die Terminologie, die mit Nachrichtenverbindungen verbunden ist, zurückzubringen, ein Verlag und ein Verbraucher sind alles, was Sie zu tun haben, wenn Sie ein Neuling sind.
Nach diesem Motto macht der AMQP -Agent die Arbeit mit Rabbitmq so unterhaltsam und elegant wie möglich, indem er einige fließende Schnittstellen auslegt, die geschickt implementiert sind, passen moderne PHP -Entwicklung, gut zu arbeiten und sehr einfach zu bedienen. Doch sehr mächtig und kann die kleinsten Macken zu jedem Zeitpunkt der Arbeit mit dem Arbeiter überschreiben. Mit AMQP -Agent können Sie mit nur wenigen Codezeilen mit der Veröffentlichung und Konsum von Nachrichten beginnen und konsumieren!
AMQP-Agent überschreibt weder etwas von PHP-AMQPLIB noch die mit seinen Funktionen verbundene Terminologie. Es vereinfacht es nur; Nimmt das Lärm der Namen der Funktionen heraus und erweitert es an einigen Stellen. Es fügt auch einige nette Funktionen wie Arbeiter-Commands, dynamische Kanalweiter- und Erleichterungsmethoden hinzu.
AMQP Agent bietet auch einen leistungsstarken ereignisbasierten RPC-Client und RPC-Server für Ihre IoT-Projekte an.
Die Arbeit mit AMQP -Agent kann so einfach sein wie:
// Publisher
$ publisher = new Publisher ();
$ publisher -> work ( $ messages );
// Consumer
$ consumer = new Consumer ();
$ consumer -> work ( $ callback );
// RPC Client
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( $ request );
$ rpcClient -> disconnect ();
// RPC Server
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( $ callback );
$ rpcServer -> disconnect ();AMQP -Agent enthält eine Reihe von konkreten Klassen, die direkt verwendet werden können, und andere abstrakte Klassen, die erweitert werden können. Diese beiden Klassenvarianten haben auch eine Helfer-Unterabteilung.
| Klasse | Beschreibung | API |
|---|---|---|
Abstrakter Arbeiter *A | Eine abstrakte Klasse, die die grundlegende Funktionalität eines Arbeiters implementiert. | Dokument |
Verlag *C*S | Eine Klasse, die auf die Veröffentlichung spezialisiert ist. Implementieren nur die für einen Verlag erforderlichen Methoden. | Dokument |
Verbraucher *C*S | Eine Klasse, die sich auf den Verbrauch spezialisiert hat. Implementieren nur die für einen Verbraucher erforderlichen Methoden. | Dokument |
AbstractendPoint *A | Eine abstrakte Klasse, die die grundlegende Funktionalität eines Endpunkts implementiert. | Dokument |
Clientendpoint *C | Eine Klasse, die auf Anfragen spezialisiert ist. Implementieren nur die für einen Kunden erforderlichen Methoden. | Dokument |
Serverendpoint *C | Eine Klasse, die sich auf die Reaktion spezialisiert hat. Implementieren nur die für einen Server erforderlichen Methoden. | Dokument |
AmqpagentParameters *C*H | Eine Klasse, die alle AMQP -Agentenparameter als Konstanten enthält. | Dokument |
Dienstprogramm *C*H | Eine Klasse mit verschiedenen Helferfunktionen. | Dokument |
*C*H | Eine einfache Klasse zum Umgang mit Ereignissen (Versand und Zuhören). | Dokument |
ArrayProxy *C*H | Eine Klasse, die Methoden zum Manipulieren und Arbeitsbereich enthält. | Dokument |
Klassiker *C*H | Eine Klasse, die Methoden für Proxy -Methoden enthält, die aufrufen, Eigenschaftenmanipulation und Klassenversorger. | Dokument |
IDGenerator *C*H | Eine Klasse, die Funktionen zum Generieren von eindeutigen IDs und Token enthält | Dokument |
Serializer *C*H | Ein flexibler Serialisierer, der in Verbindung mit den Arbeitern verwendet werden soll. | Dokument |
Logger *C*H | Eine Klasse zum Schreiben von Protokollen, die Methoden aufdecken, die statisch und zur Instanziierung funktionieren. | Dokument |
Singleton *A*H | Eine abstrakte Klasse, die die grundlegende Funktionalität eines Singletons implementiert. | Dokument |
Config *C*R | Eine Klasse, die die Konfigurationsdatei in ein Objekt verwandelt. | Dokument |
Client *C*R | Eine Klasse gibt alles zurück, was AMQP -Agent zu bieten hat. Ein einfacher Service -Container sozusagen. | Dokument |
Beispiel *A*H | Eine abstrakte Klasse, die als Standard -Rückruf für den Verbraucher verwendet wird. | Dokument |
See also: AbstractWorkerSingleton, PublisherSingleton, ConsumerSingleton, AbstractWorkerInterface, PublisherInterface, ConsumerInterface, WorkerFacilitationInterface, WorkerMutationTrait, WorkerCommandTrait, AbstractEndpointInterface, ClientEndpointInterface, ServerEndpointInterface, EventTrait, ArrayProxyTrait, ClassProxyTrait, AbstractParameters.
*C Beton: Diese Klasse ist eine konkrete Klasse und kann direkt instanziiert werden.*A Zusammenfassung: Diese Klasse ist eine abstrakte Klasse und kann nicht direkt instanziiert werden.*H Helfer: Diese Klasse ist eine Helferklasse. Alternativen von Drittanbietern können stattdessen frei verwendet werden.*R empfohlen: Diese Klasse wird empfohlen, bei der Arbeit mit AMQP Agent (Best Practice) verwendet zu werden.*S Singleton: Diese Klasse verfügt über eine Singleton -Version, die über den Klassennamen mit Singleton erhältlich ist und über *Singleton::getInstance() , dh Publisher -> PublisherSingleton , abgerufen werden kann.Hinweis: Singleton wird als Anti-Muster angesehen. Versuchen Sie, es so weit wie möglich zu vermeiden, obwohl es Anwendungsfälle dafür gibt. Verwenden Sie Singletons nur, wenn Sie wissen, was Sie tun.
Wenn Sie nur schnell Nachrichten veröffentlichen und konsumieren möchten, ist alles bereits fertig und konfiguriert, AMQP -Agent wird mit einer getesteten Konfiguration versendet, die den besten Verfahren folgt. Sie können einfach Publisher -Klasse und/oder Consumer in Ihre Datei importieren und die gewünschten Parameter (z. B. Rabbitmq -Anmeldeinformationen) später auf der Instanz überschreiben.
Wenn Sie die AMQP-Agent-Konfiguration für Ihre genauen Anforderungen feinstimmen und optimieren möchten, müssen Sie etwas Arbeit erledigen. Sie müssen eine Konfigurationsdatei angeben (siehe: MAKS-AMQP-Agent-config.php und achten Sie auf die Kommentare). Sie müssen nicht alles liefern, Sie können einfach nur die Parameter schreiben, die Sie überschreiben möchten. AMQP -Agent ist klug genug, um den Mangel anzuhängen. Diese Parameter können auch später durch öffentliche Zuweisungsnotation oder pro Methodenaufruf überschrieben werden.
Fakt: AMQP-Agent verwendet die gleichen Parameternamen wie PHP-AMQPLIB in der Konfigurationsdatei und im Parameter-Array, das den Methodenaufruf übergeben hat.
<?php return [
// Global
' connectionOptions ' => [
' host ' => ' your-rabbitmq-server.com ' ,
' port ' => 5672 ,
' user ' => ' your-username ' ,
' password ' => ' your-password ' ,
' vhost ' => ' / '
],
' queueOptions ' => [
' queue ' => ' your.queue.name ' ,
' durable ' => true ,
' nowait ' => false
],
// Publisher
' exchangeOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' type ' => ' direct '
],
' bindOptions ' => [
' queue ' => ' your.queue.name ' ,
' exchange ' => ' your.exchange.name '
],
' messageOptions ' => [
' properties ' => [
' content_type ' => ' application/json ' ,
' content_encoding ' => ' UTF-8 ' ,
' delivery_mode ' => 2
]
],
' publishOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' routing_key ' => ' your.route.name '
],
// Consumer
' qosOptions ' => [
' prefetch_count ' => 25
],
' waitOptions ' => [
' timeout ' => 3600
],
' consumeOptions ' => [
' queue ' => ' your.queue.name ' ,
' consumer_tag ' => ' your.consumer.name ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback '
]
// RPC Endpoints
'rpcQueueName' => 'your.rpc.queue.name'
]; HINWEIS: Array-Namen der ersten Stufe (mit Options angezeigt) sind spezifisch für AMQP-Agent.
Bevor wir mit Beispielen beginnen, müssen wir ein paar Dinge klären. Erwähnenswert ist von Anfang an, dass es mit AMQP -Agent mehrere Möglichkeiten gibt, wie Sie einen Arbeiter abrufen können, es gibt den einfachen Weg, den empfohlenen Weg und die fortgeschritteneren Wege. Nachdem Sie einen Arbeiter abgerufen haben, ist es wie Ton, Sie können ihn so formen, wie Sie es möchten. Dieses modulare Design bietet anmutig Ihre Bedürfnisse, fährt zu einer skalierbaren Codebasis und macht einfach alle glücklich.
new Schlüsselwort. Auf diese Weise müssen Parameter über den Konstruktor, die Methodenaufrufe oder die Zuordnung öffentlicher Eigenschaften übertragen werden.PublisherSingleton::getInstance() . Auf diese Weise erfordert die Übergabe von Parametern über getInstance() -Methode, Methodenaufrufe oder Zuweisung von öffentlichen Eigenschaften.Client . Auf diese Weise macht Code auch lesbarer, wenn die Parameter aus der bestandenen Konfiguration abgerufen werden. // Instantiating Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Worker PublisherSingleton ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Worker ConsumerSingleton ;
use MAKS AmqpAgent RPC ClientEndpoint ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ publisher1 = new Publisher ( /* parameters can be passed here */ );
$ publisher2 = PublisherSingleton:: getInstance ( /* parameters can be passed here */ );
$ consumer1 = new Consumer ( /* parameters can be passed here */ );
$ consumer2 = ConsumerSingleton:: getInstance ( /* parameters can be passed here */ );
$ rpcClientA = new ClientEndpoint ( /* parameters can be passed here */ );
$ rpcServerA = new ServerEndpoint ( /* parameters can be passed here */ );
// the parameters from this Config object will be passed to the workers.
$ config = new Config ( ' path/to/your/config-file.php ' );
$ client = new Client ( $ config ); // path can also be passed directly to Client
$ publisher3 = $ client -> getPublisher (); // or $client->get('publisher');
$ consumer3 = $ client -> getConsumer (); // or $client->get('consumer');
$ rpcClientB = $ client -> getClientEndpoint (); // or $client->get('client.endpoint');
$ rpcServerB = $ client -> getServerEndpoint (); // or $client->get('server.endpoint');
// Use $client->gettable() to get an array of all available services. // Publisher Demo 1
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// exchangeOptions
' exchange ' => ' test.messages.exchange '
],
[
// bindOptions
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
],
[
// messageOptions
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ publisher -> connect ()-> queue ()-> exchange ()-> bind ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (2)
$ publisher -> prepare ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (3)
$ publisher -> work ( $ messages ); // Publisher Demo 2
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ publisher -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ publisher -> connect ();
$ publisher -> queue ([
' queue ' => ' test.messages.queue '
]);
$ publisher -> exchange ([
' exchange ' => ' test.messages.exchange '
]);
$ publisher -> bind ([
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
]);
foreach ( $ messages as $ message ) {
$ publisher -> publish (
[
' body ' => $ message ,
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
' exchange ' => ' test.messages.exchange '
]
);
}
$ publisher -> disconnect (); // Consumer Demo 1
$ consumer = new Consumer (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// qosOptions
' exchange ' => ' test.messages.exchange '
],
[
// waitOptions
],
[
// consumeOptions
' queue ' => ' test.messages.queue ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback ' ,
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ consumer -> connect ();
$ consumer -> queue ();
$ consumer -> qos ();
$ consumer -> consume ();
$ consumer -> wait ();
$ consumer -> disconnect ();
// Variant I (2)
$ consumer -> prepare ()-> consume ()-> wait ()-> disconnect ();
// Variant I (3)
$ consumer -> work ( ' YourNamespaceYourClass::yourCallback ' ); // Consumer Demo 2
$ variable = ' This variable is needed in your callback. It will be the second, the first is always the message! ' ;
$ consumer = new Consumer ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ consumer -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ consumer -> connect ();
$ consumer -> queue ([
' queue ' => ' test.messages.queue '
]);
$ consumer -> qos ([
' prefetch_count ' => 10
]);
$ consumer -> consume (
[
' YourNamespaceYourClass ' ,
' yourCallback '
],
[
$ variable
],
[
' queue ' => ' test.messages.queue '
]
);
$ consumer -> wait ();
$ consumer -> disconnect (); // RPC Client Demo 1
$ rpcClient = new ClientEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // RPC Client Demo 2
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ response = $ rpcClient -> request (
' {"command":"some-command","parameter":"some-parameter"} ' ,
' your.rpc.queue.name '
);
$ rpcClient -> disconnect (); // RPC Server Demo 1
$ rpcServer = new ServerEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect (); // RPC Server Demo 2
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ request = $ rpcServer -> respond (
' YourNamespaceYourClass::yourCallback ' ,
' your.rpc.queue.name '
);
$ rpcServer -> disconnect ();Fakt: Wenn die Lieferung von Parametern nur die benötigten Parameter bereitstellen. AMQP -Agent ist intelligent genug, um den Mangel anzuhängen.
Ratschläge: Sie können die starken Konstruktoren in den obigen Beispielen vereinfachen, wenn Sie get($className) in einer Instanz der Client -Klasse verwenden, nachdem Sie eine Konfigurationsdatei mit den gewünschten Parametern bereitgestellt haben.
Hinweis: Weitere Erläuterungen der Methoden finden Sie in AMQP -Agenten -Dokumenten. Die vollständige Erläuterung der Parameter finden Sie in der RabbitMQ-Dokumentation und PHP-AMQPLIB.
In diesen Beispielen sehen Sie, wie Sie in einem realen Szenario mit AMQP-Agent zusammenarbeiten würden.
// Advanced Publisher Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Helper Serializer ;
// Preparing some data to work with.
$ data = [];
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ data [] = [
' id ' => $ i ,
' importance ' => $ i % 3 == 0 ? ' high ' : ' low ' , // Tag 1/3 of the messages with high importance.
' text ' => ' Test message with ID ' . $ i
];
}
// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$ config = new Config ();
// Instantiating a client.
$ client = new Client ( $ config );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a publisher from the client.
/** @var MAKSAmqpAgentWorkerPublisher */
$ publisher = $ client -> get ( ' publisher ' );
// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$ publisher -> connect ();
// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$ publisher -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ publisher -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Declaring a direct exchange to publish messages to.
$ publisher -> exchange ([
' exchange ' => ' high.and.low.importance.exchange ' ,
' type ' => ' direct '
]);
// Binding the queue with the exchange.
$ publisher -> bind ([
' queue ' => ' high.and.low.importance.queue ' ,
' exchange ' => ' high.and.low.importance.exchange '
]);
// Publishing messages according to their priority.
foreach ( $ data as $ item ) {
$ payload = $ serializer -> serialize ( $ item , ' JSON ' );
if ( $ item [ ' importance ' ] == ' high ' ) {
$ publisher -> publish (
[
' body ' => $ payload ,
' properties ' => [
' priority ' => 2
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
continue ;
}
$ publisher -> publish (
$ payload , // Not providing priority will fall back to 0
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
}
// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' start ' , ' consumer ' ),
' JSON '
),
' properties ' => [
' priority ' => 1
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$ iterator = 2 ;
do {
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' close ' , ' channel ' ),
' JSON '
),
' properties ' => [
' priority ' => 0
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
$ iterator --;
} while ( $ iterator != 0 );
// Close the connection with RabbitMQ server.
$ publisher -> disconnect (); // Advanced Consumer Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Helper Serializer ;
use MAKS AmqpAgent Helper Logger ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var MAKSAmqpAgentHelperLogger */
$ logger = $ client -> get ( ' logger ' );
$ logger -> setDirectory ( __DIR__ );
$ logger -> setFilename ( ' high-and-low-importance-messages ' );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a consumer from the client.
/** @var MAKSAmqpAgentWorkerConsumer */
$ consumer = $ client -> get ( ' consumer ' );
$ consumer -> connect ();
// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$ consumer -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Overwriting the default quality of service.
$ consumer -> qos ([
' prefetch_count ' => 1 ,
]);
// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$ callback = function ( $ message , & $ client , $ callback ) {
$ data = $ client -> getSerializer ()-> unserialize ( $ message -> body , ' JSON ' );
if (Consumer:: isCommand ( $ data )) {
Consumer:: ack ( $ message );
if (Consumer:: hasCommand ( $ data , ' close ' , ' channel ' )) {
// Giving time for acknowledgements to take effect,
// because the channel will be closed shortly
sleep ( 5 );
// Close the channel using the delivery info of the message.
Consumer:: shutdown ( $ message );
} elseif (Consumer:: hasCommand ( $ data , ' start ' , ' consumer ' )) {
$ consumer = $ client -> getConsumer ();
// Getting a new channel on the same connection.
$ channel = $ consumer -> getNewChannel ();
$ consumer -> queue (
[
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
],
$ channel
);
$ consumer -> qos (
[
' prefetch_count ' => 1 ,
],
$ channel
);
$ consumer -> consume (
$ callback ,
[
& $ client ,
$ callback
],
[
' queue ' => ' high.and.low.importance.queue ' ,
' consumer_tag ' => ' callback.consumer- ' . uniqid ()
],
$ channel
);
}
return ;
}
$ client -> getLogger ()-> write ( " ( { $ data [ ' importance ' ]} ) - { $ data [ ' text ' ]}" );
// Sleep for 50ms to mimic some processing.
usleep ( 50000 );
// The final step is acknowledgment so that no data is lost.
Consumer:: ack ( $ message );
};
$ consumer -> consume (
$ callback ,
[
& $ client , // Is used to refetch the consumer, serializer, and logger.
$ callback // This gets passed to the consumer that get started by the callback.
],
[
' queue ' => ' high.and.low.importance.queue '
]
);
// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$ consumer -> waitForAll ();
// Close the connection with RabbitMQ server.
$ consumer -> disconnect (); // Advanced RPC Client Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ClientEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC client endpoint from the client.
/** @var MAKSAmqpAgentRPCClientEndpoint */
$ rpcClient = $ client -> getClientEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$ rpcClient
-> on ( ' connection.after.open ' , function ( $ connection , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is now connected! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ connection instanceof AMQPStreamConnection) {
printf ( ' The connection has currently %d channel(s). ' , count ( $ connection -> channels ) - 1 );
}
})-> on ( ' request.before.send ' , function ( $ request , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is about to send a request! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ request instanceof AMQPMessage) {
$ request -> set ( ' content_type ' , ' application/json ' )
printf ( ' The request content_type header has been set to: %s ' , $ request -> get ( ' content_type ' ));
}
});
// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$ roundtrip = $ rpcClient -> ping ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // Advanced RPC Server Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC server from the client.
/** @var MAKSAmqpAgentRPCServerEndpoint */
$ rpcServer = $ client -> getServerEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$ rpcServer
-> on ( ' request.on.get ' , function ( $ request , $ rpcServer , $ eventName ) {
printf ( ' %s has emitted [%s] event and has just got a request! ' , get_class ( $ rpcServer ), $ eventName );
if ( $ request instanceof AMQPMessage) {
printf ( ' The request has the following body: %s ' , $ request -> body ;
}
});
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect ();Fakt: Sie können den Code in Publisher/Consumer Advanced Beispielen weitaus easer erstellen, wenn Sie alle Änderungen der Parameter in einer Konfigurationsdatei vornehmen und an den Client anstelle der Standardeinstellung weitergeben.
Ratschläge: AMQP Agent Code-Base ist gut dokumentiert. Weitere Informationen zu allen Klassen und Methoden finden Sie in diesem Link.
AMQP Agent ist ein Open-Sourcing-Paket, das aufgrund der PHP-AMQPLIB-Lizenz unter der GNU LGPL V2.1 lizenziert wurde.
Copyright (C) 2020 Marwan Al-Soltany. Alle Rechte vorbehalten.