Элегантная обертка вокруг знаменитой PHP-AMQPlib для 90% вариантов использования.
Установка
О агенте AMQP
API
Документация
Конфигурация
Примеры
Ссылки
Лицензия
Изменение
Попробуйте агент AMQP сейчас:
composer require marwanalsoltany/amqp-agent Скопируйте эту конфигурацию в вашем composer.json :
"repositories" : {
"amqp-agent-repo" : {
"type" : " vcs " ,
"url" : " https://github.com/MarwanAlsoltany/amqp-agent.git "
}
},
"require" : {
"marwanalsoltany/amqp-agent" : " dev-dev "
},
"minimum-stability" : " dev "Бегать:
composer update Примечание. Агент AMQP поддерживает теперь PHP 7.1 по умолчанию, начиная с версии V1.1.1, если вы использовали филиал php7.1-compatibility в более старых версиях, обновите свой Composer.json!
Агент AMQP пытается упростить реализацию брокера сообщения в PHP-проекте. Он убирает весь накладной сигнал построения и настройки объектов или создания классов, которые вам понадобятся для общения с сервером RabbitMQ (через PHP-AMQPlib ), и выявляет проверенный, полностью настраиваемый и гибкий API, который соответствует практически любому проекту.
Библиотека PHP-AMQPLIB потрясающая и очень хорошо работает. Единственная проблема заключается в том, что в проекте довольно нельзя использовать косточки, не переработав свои собственные классы обертки, почти невозможно не писать код спагетти. Кроме того, огромное количество функций, методов и конфигураций (параметров), которые поставляются с ним, затрудняют реализацию разумного API, который будет использоваться. Агент AMQP решает эту проблему, делая как можно больше абстракции, не теряя контроля над работниками и возвращая терминологию, связанную с сообщениями сообщений, издателем и потребителем-это все, с чем вам нужно иметь дело, если вы новичок.
Согласно этому девизу, агент AMQP делает работу с RabbitMQ настолько веселой и элегантной, насколько это возможно, разоблачая некоторые свободные интерфейсы, которые умно реализованы, соответствуют современному развитию PHP, с приятной для работы и очень простыми в использовании; И все же очень мощный и может перезаписать самые маленькие причуды в любой момент работы с работником. С агентом AMQP вы можете начать публиковать и потреблять сообщения только с несколькими строками кода!
Агент AMQP не перезаписывает ничего из PHP-AMQPLIB, и не меняет терминологию, связанную с его функциями. Это только упрощает его; Убирает шум имен функций и расширяет его в некоторых местах. Он также добавляет несколько хороших функций, таких как работники, динамические ожидания канала и методы облегчения.
Агент AMQP также предлагает мощный клиент RPC на основе событий для ваших проектов IoT.
Работа с агентом AMQP может быть такой же простой, как:
// Publisher
$ publisher = new Publisher ();
$ publisher -> work ( $ messages );
// Consumer
$ consumer = new Consumer ();
$ consumer -> work ( $ callback );
// RPC Client
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( $ request );
$ rpcClient -> disconnect ();
// RPC Server
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( $ callback );
$ rpcServer -> disconnect ();Агент AMQP обнажает ряд конкретных классов, которые можно использовать непосредственно, и другие абстрактные классы, которые можно расширить. Эти два классовые варианты также имеют вспомогательное подразделение.
| Сорт | Описание | API |
|---|---|---|
Abstractworker *A | Абстрактный класс, реализующий основные функции работника. | Док |
Издатель *C*S | Класс специализируется на публикации. Реализация только методов, необходимых для издателя. | Док |
Потребитель *C*S | Класс специализируется на потреблении. Реализация только методов, необходимых для потребителя. | Док |
AbstractendPoint *A | Абстрактный класс, реализующий основную функциональность конечной точки. | Док |
ClientendPoint *C | Класс специализируется на запросе. Реализация только методов, необходимых для клиента. | Док |
ServerEndPoint *C | Класс специализируется на ответе. Реализация только методов, необходимых для сервера. | Док |
Amqpagentparameters *C*H | Класс, который содержит все параметры агента AMQP в качестве константы. | Док |
Утилита *C*H | Класс, содержащий разные вспомогательные функции. | Док |
Событие *C*H | Простой класс для обработки событий (отправка и прослушивание). | Док |
ArrayProxy *C*H | Класс, содержащий методы для манипуляции и рабочих массивов. | Док |
ClassProxy *C*H | Класс, содержащий методы для прокси -методов, вызовы, манипулирование свойствами и утилиты классов. | Док |
IdGenerator *C*H | Класс, содержащий функции для генерации уникальных идентификаторов и токенов | Док |
Сериализатор *C*H | Гибкий сериализатор, который будет использоваться в сочетании с работниками. | Док |
Регистратор *C*H | Класс для записи журналов, разоблачение методов, которые работают статически и на экземпляре. | Док |
Singleton *A*H | Абстрактный класс, внедряющий фундаментальную функциональность синглтона. | Док |
Конфигурация *C*R | Класс, который превращает файл конфигурации в объект. | Док |
Клиент *C*R | Класс возвращает все, что может предложить агент AMQP. Простой сервисный контейнер так сказать. | Док |
Пример *A*H | Абстрактный класс, используемый в качестве обратного вызова по умолчанию для потребителя. | Док |
См. Также: Abstractworkersingleton, Publishersingleton, Consumersingleton, AbstractworkerInterface, PublisherInterface, ConsumerInterface, WorkerFacilitationInterface, WorkerMatationTrait, WorkerCommandtrait, AbstractendPointInterface, ClientEntInterface, ServeldPointInterface, EventTrait, ArrayProxyTrait, ClassProxytrait, Abstractpointerface.
*C Concrete: этот класс является бетонным классом и может быть создан напрямую.*A Аннотация: этот класс является абстрактным классом и не может быть создан напрямую.*H Helper: Этот класс - помощник. Сторонние альтернативы могут быть свободно использованы вместо этого.*R Рекомендуется: этот класс рекомендуется использовать при работе с агентом AMQP (лучшая практика).*S Singleton: этот класс имеет синглтонскую версию, доступную через суффиксы названия класса с Singleton , и может быть получена через *Singleton::getInstance() , IE Publisher -> PublisherSingleton .Примечание: Singleton считается анти-паттерном, старайтесь избегать его как можно большего, хотя для него есть коэффициенты использования. Используйте Singletons, только если вы знаете, что делаете.
Если вы просто быстро хотите публиковать и потреблять сообщения, все уже готово и настроено, агент AMQP поставляется с тестируемой конфигурацией, которая следует за лучшими практиками. Вы можете просто импортировать класс Publisher и/или Consumer класс в вашем файле и перезаписать нужные вам параметры (например, учетные данные RabbitMQ) позже на экземпляре.
Если вы хотите точно настроить и настроить конфигурацию агента AMQP в соответствии с вашими потребностями, есть немного работы. Вы должны предоставить файл конфигурации (см.: MAKS-AMQP-AGENT-CONFIG.PHP и обратить внимание на комментарии). Вам не нужно снабжать все, вы можете просто написать только параметры, которые вы хотите перезаписать, агент AMQP достаточно умный, чтобы добавить недостаток. Эти параметры также могут быть перезаписаны позже посредством нотации публичного назначения или по вызову метода.
Факт: агент AMQP использует те же имена параметров, что и PHP-AMQPlib в файле конфигурации, и в массиве параметров, передаваемых при вызове метода.
<?php return [
// Global
' connectionOptions ' => [
' host ' => ' your-rabbitmq-server.com ' ,
' port ' => 5672 ,
' user ' => ' your-username ' ,
' password ' => ' your-password ' ,
' vhost ' => ' / '
],
' queueOptions ' => [
' queue ' => ' your.queue.name ' ,
' durable ' => true ,
' nowait ' => false
],
// Publisher
' exchangeOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' type ' => ' direct '
],
' bindOptions ' => [
' queue ' => ' your.queue.name ' ,
' exchange ' => ' your.exchange.name '
],
' messageOptions ' => [
' properties ' => [
' content_type ' => ' application/json ' ,
' content_encoding ' => ' UTF-8 ' ,
' delivery_mode ' => 2
]
],
' publishOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' routing_key ' => ' your.route.name '
],
// Consumer
' qosOptions ' => [
' prefetch_count ' => 25
],
' waitOptions ' => [
' timeout ' => 3600
],
' consumeOptions ' => [
' queue ' => ' your.queue.name ' ,
' consumer_tag ' => ' your.consumer.name ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback '
]
// RPC Endpoints
'rpcQueueName' => 'your.rpc.queue.name'
]; ПРИМЕЧАНИЕ. Имена ключей первого уровня массива (суффикс с Options ) специфичны для агента AMQP.
Прежде чем мы начнем с примеров, мы должны уточнить несколько вещей. С самого начала стоит упомянуть, что с агентом AMQP есть несколько способов к тому, как вы можете получить работника, есть простой способ, рекомендуемый способ и более продвинутые способы. После того, как вы получите работника, это как глина, вы можете сформировать его так, как хотите. Этот модульный дизайн изящно учитывает ваши потребности, приводит к масштабируемой кодовой базе и просто делает всех счастливыми.
new ключевого слова. Этот способ требует прохождения параметров с помощью конструктора, вызовов методов или назначения общественного имущества.PublisherSingleton::getInstance() . Этот способ требует передачи параметров с помощью метода getInstance() , вызовов метода или назначения общественного имущества.Client класса. Таким образом, также делает код более читабельным, так как параметры извлекаются из пропущенной конфигурации. // Instantiating Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Worker PublisherSingleton ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Worker ConsumerSingleton ;
use MAKS AmqpAgent RPC ClientEndpoint ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ publisher1 = new Publisher ( /* parameters can be passed here */ );
$ publisher2 = PublisherSingleton:: getInstance ( /* parameters can be passed here */ );
$ consumer1 = new Consumer ( /* parameters can be passed here */ );
$ consumer2 = ConsumerSingleton:: getInstance ( /* parameters can be passed here */ );
$ rpcClientA = new ClientEndpoint ( /* parameters can be passed here */ );
$ rpcServerA = new ServerEndpoint ( /* parameters can be passed here */ );
// the parameters from this Config object will be passed to the workers.
$ config = new Config ( ' path/to/your/config-file.php ' );
$ client = new Client ( $ config ); // path can also be passed directly to Client
$ publisher3 = $ client -> getPublisher (); // or $client->get('publisher');
$ consumer3 = $ client -> getConsumer (); // or $client->get('consumer');
$ rpcClientB = $ client -> getClientEndpoint (); // or $client->get('client.endpoint');
$ rpcServerB = $ client -> getServerEndpoint (); // or $client->get('server.endpoint');
// Use $client->gettable() to get an array of all available services. // Publisher Demo 1
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// exchangeOptions
' exchange ' => ' test.messages.exchange '
],
[
// bindOptions
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
],
[
// messageOptions
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ publisher -> connect ()-> queue ()-> exchange ()-> bind ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (2)
$ publisher -> prepare ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (3)
$ publisher -> work ( $ messages ); // Publisher Demo 2
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ publisher -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ publisher -> connect ();
$ publisher -> queue ([
' queue ' => ' test.messages.queue '
]);
$ publisher -> exchange ([
' exchange ' => ' test.messages.exchange '
]);
$ publisher -> bind ([
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
]);
foreach ( $ messages as $ message ) {
$ publisher -> publish (
[
' body ' => $ message ,
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
' exchange ' => ' test.messages.exchange '
]
);
}
$ publisher -> disconnect (); // Consumer Demo 1
$ consumer = new Consumer (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// qosOptions
' exchange ' => ' test.messages.exchange '
],
[
// waitOptions
],
[
// consumeOptions
' queue ' => ' test.messages.queue ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback ' ,
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ consumer -> connect ();
$ consumer -> queue ();
$ consumer -> qos ();
$ consumer -> consume ();
$ consumer -> wait ();
$ consumer -> disconnect ();
// Variant I (2)
$ consumer -> prepare ()-> consume ()-> wait ()-> disconnect ();
// Variant I (3)
$ consumer -> work ( ' YourNamespaceYourClass::yourCallback ' ); // Consumer Demo 2
$ variable = ' This variable is needed in your callback. It will be the second, the first is always the message! ' ;
$ consumer = new Consumer ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ consumer -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ consumer -> connect ();
$ consumer -> queue ([
' queue ' => ' test.messages.queue '
]);
$ consumer -> qos ([
' prefetch_count ' => 10
]);
$ consumer -> consume (
[
' YourNamespaceYourClass ' ,
' yourCallback '
],
[
$ variable
],
[
' queue ' => ' test.messages.queue '
]
);
$ consumer -> wait ();
$ consumer -> disconnect (); // RPC Client Demo 1
$ rpcClient = new ClientEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // RPC Client Demo 2
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ response = $ rpcClient -> request (
' {"command":"some-command","parameter":"some-parameter"} ' ,
' your.rpc.queue.name '
);
$ rpcClient -> disconnect (); // RPC Server Demo 1
$ rpcServer = new ServerEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect (); // RPC Server Demo 2
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ request = $ rpcServer -> respond (
' YourNamespaceYourClass::yourCallback ' ,
' your.rpc.queue.name '
);
$ rpcServer -> disconnect ();Факт: при подаче параметров предоставляют только те параметры, которые вам нужны. Агент AMQP достаточно умный, чтобы добавить недостаток.
Консультации: Вы можете упростить тяжелые конструкторы, написанные в приведенных выше примерах, если вы используете get($className) на экземпляре класса Client после предоставления файла конфигурации с параметрами, которые вы хотите.
Примечание. См. Документы агента AMQP для полного объяснения методов. Обратитесь к документации RabbitMQ и PHP-AMQPLIB для полного объяснения параметров.
В этих примерах вы увидите, как вы будете работать с агентом AMQP в реальном сценарии.
// Advanced Publisher Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Helper Serializer ;
// Preparing some data to work with.
$ data = [];
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ data [] = [
' id ' => $ i ,
' importance ' => $ i % 3 == 0 ? ' high ' : ' low ' , // Tag 1/3 of the messages with high importance.
' text ' => ' Test message with ID ' . $ i
];
}
// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$ config = new Config ();
// Instantiating a client.
$ client = new Client ( $ config );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a publisher from the client.
/** @var MAKSAmqpAgentWorkerPublisher */
$ publisher = $ client -> get ( ' publisher ' );
// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$ publisher -> connect ();
// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$ publisher -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ publisher -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Declaring a direct exchange to publish messages to.
$ publisher -> exchange ([
' exchange ' => ' high.and.low.importance.exchange ' ,
' type ' => ' direct '
]);
// Binding the queue with the exchange.
$ publisher -> bind ([
' queue ' => ' high.and.low.importance.queue ' ,
' exchange ' => ' high.and.low.importance.exchange '
]);
// Publishing messages according to their priority.
foreach ( $ data as $ item ) {
$ payload = $ serializer -> serialize ( $ item , ' JSON ' );
if ( $ item [ ' importance ' ] == ' high ' ) {
$ publisher -> publish (
[
' body ' => $ payload ,
' properties ' => [
' priority ' => 2
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
continue ;
}
$ publisher -> publish (
$ payload , // Not providing priority will fall back to 0
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
}
// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' start ' , ' consumer ' ),
' JSON '
),
' properties ' => [
' priority ' => 1
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$ iterator = 2 ;
do {
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' close ' , ' channel ' ),
' JSON '
),
' properties ' => [
' priority ' => 0
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
$ iterator --;
} while ( $ iterator != 0 );
// Close the connection with RabbitMQ server.
$ publisher -> disconnect (); // Advanced Consumer Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Helper Serializer ;
use MAKS AmqpAgent Helper Logger ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var MAKSAmqpAgentHelperLogger */
$ logger = $ client -> get ( ' logger ' );
$ logger -> setDirectory ( __DIR__ );
$ logger -> setFilename ( ' high-and-low-importance-messages ' );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a consumer from the client.
/** @var MAKSAmqpAgentWorkerConsumer */
$ consumer = $ client -> get ( ' consumer ' );
$ consumer -> connect ();
// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$ consumer -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Overwriting the default quality of service.
$ consumer -> qos ([
' prefetch_count ' => 1 ,
]);
// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$ callback = function ( $ message , & $ client , $ callback ) {
$ data = $ client -> getSerializer ()-> unserialize ( $ message -> body , ' JSON ' );
if (Consumer:: isCommand ( $ data )) {
Consumer:: ack ( $ message );
if (Consumer:: hasCommand ( $ data , ' close ' , ' channel ' )) {
// Giving time for acknowledgements to take effect,
// because the channel will be closed shortly
sleep ( 5 );
// Close the channel using the delivery info of the message.
Consumer:: shutdown ( $ message );
} elseif (Consumer:: hasCommand ( $ data , ' start ' , ' consumer ' )) {
$ consumer = $ client -> getConsumer ();
// Getting a new channel on the same connection.
$ channel = $ consumer -> getNewChannel ();
$ consumer -> queue (
[
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
],
$ channel
);
$ consumer -> qos (
[
' prefetch_count ' => 1 ,
],
$ channel
);
$ consumer -> consume (
$ callback ,
[
& $ client ,
$ callback
],
[
' queue ' => ' high.and.low.importance.queue ' ,
' consumer_tag ' => ' callback.consumer- ' . uniqid ()
],
$ channel
);
}
return ;
}
$ client -> getLogger ()-> write ( " ( { $ data [ ' importance ' ]} ) - { $ data [ ' text ' ]}" );
// Sleep for 50ms to mimic some processing.
usleep ( 50000 );
// The final step is acknowledgment so that no data is lost.
Consumer:: ack ( $ message );
};
$ consumer -> consume (
$ callback ,
[
& $ client , // Is used to refetch the consumer, serializer, and logger.
$ callback // This gets passed to the consumer that get started by the callback.
],
[
' queue ' => ' high.and.low.importance.queue '
]
);
// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$ consumer -> waitForAll ();
// Close the connection with RabbitMQ server.
$ consumer -> disconnect (); // Advanced RPC Client Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ClientEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC client endpoint from the client.
/** @var MAKSAmqpAgentRPCClientEndpoint */
$ rpcClient = $ client -> getClientEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$ rpcClient
-> on ( ' connection.after.open ' , function ( $ connection , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is now connected! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ connection instanceof AMQPStreamConnection) {
printf ( ' The connection has currently %d channel(s). ' , count ( $ connection -> channels ) - 1 );
}
})-> on ( ' request.before.send ' , function ( $ request , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is about to send a request! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ request instanceof AMQPMessage) {
$ request -> set ( ' content_type ' , ' application/json ' )
printf ( ' The request content_type header has been set to: %s ' , $ request -> get ( ' content_type ' ));
}
});
// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$ roundtrip = $ rpcClient -> ping ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // Advanced RPC Server Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC server from the client.
/** @var MAKSAmqpAgentRPCServerEndpoint */
$ rpcServer = $ client -> getServerEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$ rpcServer
-> on ( ' request.on.get ' , function ( $ request , $ rpcServer , $ eventName ) {
printf ( ' %s has emitted [%s] event and has just got a request! ' , get_class ( $ rpcServer ), $ eventName );
if ( $ request instanceof AMQPMessage) {
printf ( ' The request has the following body: %s ' , $ request -> body ;
}
});
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect ();Факт: вы можете сделать код в Publisher/Consumer Advanced примеры намного более промышленными, если вы внесете изменения всех параметров в файле конфигурации и передаете его клиенту, а не по умолчанию.
Консультации: Кодовая база агента AMQP хорошо задокументирована, пожалуйста, обратитесь к этой ссылке, чтобы просмотреть все классы и методы.
Агент AMQP-это пакет с открытым исходным кодом, лицензированный в рамках GNU LGPL V2.1 из-за лицензии PHP-AMQPLIB.
Авторские права (C) 2020 Marwan Al-Soltany. Все права защищены.