Um invólucro elegante em torno do famoso PHP-amqplib para 90% de caso de uso.
Instalação
Sobre o agente AMQP
API
Documentação
Configuração
Exemplos
Links
Licença
Changelog
Experimente o agente AMQP agora:
composer require marwanalsoltany/amqp-agent Copie esta configuração em seu composer.json :
"repositories" : {
"amqp-agent-repo" : {
"type" : " vcs " ,
"url" : " https://github.com/MarwanAlsoltany/amqp-agent.git "
}
},
"require" : {
"marwanalsoltany/amqp-agent" : " dev-dev "
},
"minimum-stability" : " dev "Correr:
composer update NOTA: O agente AMQP suporta agora o PHP 7.1 por padrão, a partir da versão v1.1.1, se você usou a filial php7.1-compatibility em versões mais antigas, atualize seu composer.json!
O agente AMQP tenta simplificar a implementação de um corretor de mensagens em um projeto PHP. Ele tira toda a sobrecarga da construção e configuração de objetos ou criação de classes necessárias para conversar com o RabbitMQ Server (através do PHP-amqplib ) e expõe uma API testada, totalmente configurável e flexível que se encaixa em quase qualquer projeto.
A biblioteca Php-amqplib é incrível e funciona muito bem. O único problema é que é muito nua ser usada em um projeto, sem refazer suas próprias aulas de wrapper, é quase impossível não escrever código de espaguete. Além disso, a enorme quantidade de funções, métodos e configurações (parâmetros) que o acompanham dificultam a implementação de uma API razoável a ser usada. O agente AMQP resolve esse problema, tornando o máximo possível de abstração sem perder o controle sobre os trabalhadores e trazendo de volta a terminologia associada a corretores de mensagens, um editor e um consumidor é tudo o que você precisa lidar se for um recém-chegado.
De acordo com este lema, o agente AMQP torna o trabalho com o RabbitMQ o mais divertido e elegante possível, expondo algumas interfaces fluentes que são implementadas inteligentemente, ajustem o desenvolvimento moderno do PHP, bom de se trabalhar e muito simples de usar; No entanto, muito poderoso e pode substituir as menores peculiaridades em qualquer ponto de trabalho com o trabalhador. Com o agente AMQP, você pode começar a publicar e consumir mensagens com apenas algumas linhas de código!
O agente AMQP não substitui nada do PHP-AMQPLIB nem altera a terminologia associada às suas funções. Isso apenas simplifica; Tira o barulho dos nomes das funções e o estende em alguns lugares. Ele também adiciona alguns recursos legais, como trabalhadores comandos, métodos dinâmicos de espera de canal e facilitação.
O AMQP Agent também oferece um poderoso cliente RPC e servidor RPC baseado em eventos para seus projetos de IoT.
Trabalhar com o agente AMQP pode ser tão fácil quanto:
// Publisher
$ publisher = new Publisher ();
$ publisher -> work ( $ messages );
// Consumer
$ consumer = new Consumer ();
$ consumer -> work ( $ callback );
// RPC Client
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( $ request );
$ rpcClient -> disconnect ();
// RPC Server
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( $ callback );
$ rpcServer -> disconnect ();O agente AMQP expõe uma série de classes de concreto que podem ser usadas diretamente e outras classes abstratas que podem ser estendidas. Esses dois variantes de classe também têm uma subdivisão auxiliar.
| Aula | Descrição | API |
|---|---|---|
AbstractWorker *A | Uma classe abstrata implementando a funcionalidade básica de um trabalhador. | Doc |
Editor *C*S | Uma aula especializada em publicação. Implementando apenas os métodos necessários para um editor. | Doc |
Consumidor *C*S | Uma aula especializada em consumir. Implementando apenas os métodos necessários para um consumidor. | Doc |
AbstractEndPoint *A | Uma classe abstrata implementando a funcionalidade básica de um ponto final. | Doc |
ClientEndPoint *C | Uma aula especializada em solicitar. Implementando apenas os métodos necessários para um cliente. | Doc |
ServerEndPoint *C | Uma aula especializada em responder. Implementando apenas os métodos necessários para um servidor. | Doc |
AMQPAGENTPARAMETERS *C*H | Uma classe que contém todos os parâmetros do agente AMQP como constantes. | Doc |
Utilidade *C*H | Uma classe contendo funções auxiliares diversas. | Doc |
Evento *C*H | Uma classe simples para lidar com eventos (despacho e escuta). | Doc |
ArrayProxy *C*H | Uma classe que contém métodos para manipular e trabalhar matrizes. | Doc |
ClassProxy *C*H | Uma classe que contém métodos para os métodos de proxy chamando, manipulação de propriedades e utilitários de classe. | Doc |
IDGenerator *C*H | Uma classe que contém funções para gerar IDs e tokens exclusivos | Doc |
Serializer *C*H | Um serializador flexível para ser usado em conjunto com os trabalhadores. | Doc |
Logger *C*H | Uma classe para escrever logs, expondo métodos que funcionam estaticamente e em instanciação. | Doc |
Singleton *A*H | Uma classe abstrata implementando a funcionalidade fundamental de um singleton. | Doc |
Config *C*R | Uma classe que transforma o arquivo de configuração em um objeto. | Doc |
Cliente *C*R | Uma classe retorna tudo o que o agente AMQP tem a oferecer. Um recipiente de serviço simples, por assim dizer. | Doc |
Exemplo *A*H | Uma classe abstrata usada como retorno de chamada padrão para o consumidor. | Doc |
See also: AbstractWorkerSingleton, PublisherSingleton, ConsumerSingleton, AbstractWorkerInterface, PublisherInterface, ConsumerInterface, WorkerFacilitationInterface, WorkerMutationTrait, WorkerCommandTrait, AbstractEndpointInterface, ClientEndpointInterface, ServerEndpointInterface, EventTrait, ArrayProxyTrait, ClassProxyTrait, AbstractParameters.
*C Concreto: Esta classe é uma classe de concreto e pode ser instanciada diretamente.*A resumo: Esta classe é uma classe abstrata e não pode ser instanciada diretamente.*H Helper: Esta classe é uma classe auxiliar. As alternativas de terceiros podem ser usadas livremente.*R recomendado: Recomenda -se que esta classe seja usada ao trabalhar com o agente AMQP (prática recomendada).*S Singleton: Esta classe tem uma versão singleton disponível por sufixo do nome da classe com Singleton e pode ser recuperado via *Singleton::getInstance() , ou seja, Publisher -> PublisherSingleton .Nota: Singleton é considerado um anti-padronização, tente evitá-lo o máximo possível, embora haja casos de uso para ele. Use singletons apenas se você souber o que está fazendo.
Se você apenas deseja publicar e consumir mensagens, tudo já está pronto e configurado, o agente AMQP será enviado com uma configuração testada que segue as melhores práticas. Você pode simplesmente importar a classe Publisher e/ou Consumer em seu arquivo e substituir os parâmetros desejados (credenciais RabbitMQ, por exemplo) posteriormente na instância.
Se você deseja ajustar e ajustar a configuração do agente AMQP para suas necessidades exatas, há um pouco de trabalho a fazer. Você deve fornecer um arquivo de configuração (consulte: maks-amqp-agent-config.php e prestar atenção aos comentários). Você não precisa fornecer tudo, simplesmente pode escrever apenas os parâmetros que deseja substituir, o agente AMQP é inteligente o suficiente para anexar a deficiência. Esses parâmetros também podem ser substituídos posteriormente através da notação de atribuição pública ou da chamada de método.
Fato: o agente AMQP usa os mesmos nomes de parâmetros que o php-amqplib no arquivo de configuração e na matriz de parâmetros passados na chamada do método.
<?php return [
// Global
' connectionOptions ' => [
' host ' => ' your-rabbitmq-server.com ' ,
' port ' => 5672 ,
' user ' => ' your-username ' ,
' password ' => ' your-password ' ,
' vhost ' => ' / '
],
' queueOptions ' => [
' queue ' => ' your.queue.name ' ,
' durable ' => true ,
' nowait ' => false
],
// Publisher
' exchangeOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' type ' => ' direct '
],
' bindOptions ' => [
' queue ' => ' your.queue.name ' ,
' exchange ' => ' your.exchange.name '
],
' messageOptions ' => [
' properties ' => [
' content_type ' => ' application/json ' ,
' content_encoding ' => ' UTF-8 ' ,
' delivery_mode ' => 2
]
],
' publishOptions ' => [
' exchange ' => ' your.exchange.name ' ,
' routing_key ' => ' your.route.name '
],
// Consumer
' qosOptions ' => [
' prefetch_count ' => 25
],
' waitOptions ' => [
' timeout ' => 3600
],
' consumeOptions ' => [
' queue ' => ' your.queue.name ' ,
' consumer_tag ' => ' your.consumer.name ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback '
]
// RPC Endpoints
'rpcQueueName' => 'your.rpc.queue.name'
]; NOTA: Os nomes de chave de primeiro nível da matriz (sufixo com Options ) são específicos para o agente AMQP.
Antes de começarmos com exemplos, temos que esclarecer algumas coisas. Vale a pena mencionar desde o início que, com o agente AMQP, existem várias maneiras de como você pode recuperar um trabalhador, existe a maneira simples, a maneira recomendada e as maneiras mais avançadas. Depois de recuperar um trabalhador, é como argila, você pode formar da maneira que deseja. Esse design modular acomoda graciosamente suas necessidades, dirige para uma base de código escalável e simplesmente deixa todos felizes.
new palavra -chave. Dessa forma, requer a passagem de parâmetros por meio do construtor, chamadas de método ou atribuição de propriedade pública.PublisherSingleton::getInstance() . Dessa forma, requer a passagem de parâmetros via método getInstance() , chamadas de método ou atribuição de propriedade pública.Client . Dessa forma, também torna o código mais legível à medida que os parâmetros são recuperados da configuração passada. // Instantiating Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Worker PublisherSingleton ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Worker ConsumerSingleton ;
use MAKS AmqpAgent RPC ClientEndpoint ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ publisher1 = new Publisher ( /* parameters can be passed here */ );
$ publisher2 = PublisherSingleton:: getInstance ( /* parameters can be passed here */ );
$ consumer1 = new Consumer ( /* parameters can be passed here */ );
$ consumer2 = ConsumerSingleton:: getInstance ( /* parameters can be passed here */ );
$ rpcClientA = new ClientEndpoint ( /* parameters can be passed here */ );
$ rpcServerA = new ServerEndpoint ( /* parameters can be passed here */ );
// the parameters from this Config object will be passed to the workers.
$ config = new Config ( ' path/to/your/config-file.php ' );
$ client = new Client ( $ config ); // path can also be passed directly to Client
$ publisher3 = $ client -> getPublisher (); // or $client->get('publisher');
$ consumer3 = $ client -> getConsumer (); // or $client->get('consumer');
$ rpcClientB = $ client -> getClientEndpoint (); // or $client->get('client.endpoint');
$ rpcServerB = $ client -> getServerEndpoint (); // or $client->get('server.endpoint');
// Use $client->gettable() to get an array of all available services. // Publisher Demo 1
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// exchangeOptions
' exchange ' => ' test.messages.exchange '
],
[
// bindOptions
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
],
[
// messageOptions
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ publisher -> connect ()-> queue ()-> exchange ()-> bind ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (2)
$ publisher -> prepare ();
foreach ( $ messages as $ message ) {
$ publisher -> publish ( $ message );
}
$ publisher -> disconnect ();
// Variant I (3)
$ publisher -> work ( $ messages ); // Publisher Demo 2
$ messages = [
' This is an example message. ID [1]. ' ,
' This is an example message. ID [2]. ' ,
' This is an example message. ID [3]. '
];
$ publisher = new Publisher ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ publisher -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ publisher -> connect ();
$ publisher -> queue ([
' queue ' => ' test.messages.queue '
]);
$ publisher -> exchange ([
' exchange ' => ' test.messages.exchange '
]);
$ publisher -> bind ([
' queue ' => ' test.messages.queue ' ,
' exchange ' => ' test.messages.exchange '
]);
foreach ( $ messages as $ message ) {
$ publisher -> publish (
[
' body ' => $ message ,
' properties ' => [
' content_type ' => ' text/plain ' ,
]
],
[
' exchange ' => ' test.messages.exchange '
]
);
}
$ publisher -> disconnect (); // Consumer Demo 1
$ consumer = new Consumer (
[
// connectionOptions
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
[
// channelOptions
],
[
// queueOptions
' queue ' => ' test.messages.queue '
],
[
// qosOptions
' exchange ' => ' test.messages.exchange '
],
[
// waitOptions
],
[
// consumeOptions
' queue ' => ' test.messages.queue ' ,
' callback ' => ' YourNamespaceYourClass::yourCallback ' ,
],
[
// publishOptions
' exchange ' => ' test.messages.exchange '
]
);
// Variant I (1)
$ consumer -> connect ();
$ consumer -> queue ();
$ consumer -> qos ();
$ consumer -> consume ();
$ consumer -> wait ();
$ consumer -> disconnect ();
// Variant I (2)
$ consumer -> prepare ()-> consume ()-> wait ()-> disconnect ();
// Variant I (3)
$ consumer -> work ( ' YourNamespaceYourClass::yourCallback ' ); // Consumer Demo 2
$ variable = ' This variable is needed in your callback. It will be the second, the first is always the message! ' ;
$ consumer = new Consumer ();
// connect() method does not take any parameters.
// Public assignment notation is used instead.
// Starting from v1.1.0, you can use getNewConnection(),
// setConnection(), getNewChannel(), and setChannel() instead.
$ consumer -> connectionOptions = [
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
];
$ consumer -> connect ();
$ consumer -> queue ([
' queue ' => ' test.messages.queue '
]);
$ consumer -> qos ([
' prefetch_count ' => 10
]);
$ consumer -> consume (
[
' YourNamespaceYourClass ' ,
' yourCallback '
],
[
$ variable
],
[
' queue ' => ' test.messages.queue '
]
);
$ consumer -> wait ();
$ consumer -> disconnect (); // RPC Client Demo 1
$ rpcClient = new ClientEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // RPC Client Demo 2
$ rpcClient = new ClientEndpoint ();
$ rpcClient -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ response = $ rpcClient -> request (
' {"command":"some-command","parameter":"some-parameter"} ' ,
' your.rpc.queue.name '
);
$ rpcClient -> disconnect (); // RPC Server Demo 1
$ rpcServer = new ServerEndpoint (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect (); // RPC Server Demo 2
$ rpcServer = new ServerEndpoint ();
$ rpcServer -> connect (
// connectionOptions
[
' host ' => ' localhost ' ,
' user ' => ' guest ' ,
' password ' => ' guest '
],
// queueName
' your.rpc.queue.name '
);
$ request = $ rpcServer -> respond (
' YourNamespaceYourClass::yourCallback ' ,
' your.rpc.queue.name '
);
$ rpcServer -> disconnect ();Fato: Ao fornecer parâmetros, fornece apenas os parâmetros que você precisa. O agente AMQP é inteligente o suficiente para anexar a deficiência.
Conselho: Você pode simplificar os construtores pesados escritos nos exemplos acima se você usar get($className) em uma instância da classe Client depois de fornecer um arquivo de configuração com os parâmetros desejados.
Nota: Consulte os documentos do agente AMQP para obter a explicação completa dos métodos. Consulte a Documentação do RabbitMQ e o PHP-AMQPLIB para obter a explicação completa dos parâmetros.
Nesses exemplos, você verá como trabalharia com o agente AMQP em um cenário do mundo real.
// Advanced Publisher Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Publisher ;
use MAKS AmqpAgent Helper Serializer ;
// Preparing some data to work with.
$ data = [];
for ( $ i = 0 ; $ i < 10000 ; $ i ++) {
$ data [] = [
' id ' => $ i ,
' importance ' => $ i % 3 == 0 ? ' high ' : ' low ' , // Tag 1/3 of the messages with high importance.
' text ' => ' Test message with ID ' . $ i
];
}
// Instantiating a config object.
// Note that not passing a config file path falls back to the default config.
// Starting from v1.2.2, you can use has(), get(), set() methods to modify config values.
$ config = new Config ();
// Instantiating a client.
$ client = new Client ( $ config );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a publisher from the client.
/** @var MAKSAmqpAgentWorkerPublisher */
$ publisher = $ client -> get ( ' publisher ' );
// Connecting to RabbitMQ server using the default config.
// host: localhost, port: 5672, username: guest, password: guest.
$ publisher -> connect ();
// Declaring high and low importance messages queue.
// Note that this queue is lazy and accept priority messages.
$ publisher -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ publisher -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Declaring a direct exchange to publish messages to.
$ publisher -> exchange ([
' exchange ' => ' high.and.low.importance.exchange ' ,
' type ' => ' direct '
]);
// Binding the queue with the exchange.
$ publisher -> bind ([
' queue ' => ' high.and.low.importance.queue ' ,
' exchange ' => ' high.and.low.importance.exchange '
]);
// Publishing messages according to their priority.
foreach ( $ data as $ item ) {
$ payload = $ serializer -> serialize ( $ item , ' JSON ' );
if ( $ item [ ' importance ' ] == ' high ' ) {
$ publisher -> publish (
[
' body ' => $ payload ,
' properties ' => [
' priority ' => 2
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
continue ;
}
$ publisher -> publish (
$ payload , // Not providing priority will fall back to 0
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
}
// Starting a new consumer after messages with high importance are consumed.
// Pay attention to the priority, this message will be placed just after
// high importance messages but before low importance messages.
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' start ' , ' consumer ' ),
' JSON '
),
' properties ' => [
' priority ' => 1
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
// Since we have two consumers now, one from the original worker
// and the other gets started later in the callback. We have
// to publish two channel-closing commands to stop the consumers.
// These will be added at the end after low importance messages.
$ iterator = 2 ;
do {
$ publisher -> publish (
[
' body ' => $ serializer -> serialize (
Publisher:: makeCommand ( ' close ' , ' channel ' ),
' JSON '
),
' properties ' => [
' priority ' => 0
],
],
[
' exchange ' => ' high.and.low.importance.exchange '
]
);
$ iterator --;
} while ( $ iterator != 0 );
// Close the connection with RabbitMQ server.
$ publisher -> disconnect (); // Advanced Consumer Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent Worker Consumer ;
use MAKS AmqpAgent Helper Serializer ;
use MAKS AmqpAgent Helper Logger ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving a logger from the client.
// And setting its write directory and filename.
/** @var MAKSAmqpAgentHelperLogger */
$ logger = $ client -> get ( ' logger ' );
$ logger -> setDirectory ( __DIR__ );
$ logger -> setFilename ( ' high-and-low-importance-messages ' );
// Retrieving a serializer from the client.
/** @var MAKSAmqpAgentHelperSerializer */
$ serializer = $ client -> get ( ' serializer ' );
// Retrieving a consumer from the client.
/** @var MAKSAmqpAgentWorkerConsumer */
$ consumer = $ client -> get ( ' consumer ' );
$ consumer -> connect ();
// Declaring high and low importance messages queue for the consumer.
// The declaration here must match the one on the publisher. This step
// can also be omitted if you're sure that the queue exists on the server.
$ consumer -> queue ([
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
]);
// Overwriting the default quality of service.
$ consumer -> qos ([
' prefetch_count ' => 1 ,
]);
// The callback is defined here for demonstration purposes
// Normally you should separate this in its own class.
$ callback = function ( $ message , & $ client , $ callback ) {
$ data = $ client -> getSerializer ()-> unserialize ( $ message -> body , ' JSON ' );
if (Consumer:: isCommand ( $ data )) {
Consumer:: ack ( $ message );
if (Consumer:: hasCommand ( $ data , ' close ' , ' channel ' )) {
// Giving time for acknowledgements to take effect,
// because the channel will be closed shortly
sleep ( 5 );
// Close the channel using the delivery info of the message.
Consumer:: shutdown ( $ message );
} elseif (Consumer:: hasCommand ( $ data , ' start ' , ' consumer ' )) {
$ consumer = $ client -> getConsumer ();
// Getting a new channel on the same connection.
$ channel = $ consumer -> getNewChannel ();
$ consumer -> queue (
[
' queue ' => ' high.and.low.importance.queue ' ,
' arguments ' => $ consumer -> arguments ([
' x-max-priority ' => 2 ,
' x-queue-mode ' => ' lazy '
])
],
$ channel
);
$ consumer -> qos (
[
' prefetch_count ' => 1 ,
],
$ channel
);
$ consumer -> consume (
$ callback ,
[
& $ client ,
$ callback
],
[
' queue ' => ' high.and.low.importance.queue ' ,
' consumer_tag ' => ' callback.consumer- ' . uniqid ()
],
$ channel
);
}
return ;
}
$ client -> getLogger ()-> write ( " ( { $ data [ ' importance ' ]} ) - { $ data [ ' text ' ]}" );
// Sleep for 50ms to mimic some processing.
usleep ( 50000 );
// The final step is acknowledgment so that no data is lost.
Consumer:: ack ( $ message );
};
$ consumer -> consume (
$ callback ,
[
& $ client , // Is used to refetch the consumer, serializer, and logger.
$ callback // This gets passed to the consumer that get started by the callback.
],
[
' queue ' => ' high.and.low.importance.queue '
]
);
// Here we have to wait using waitForAll() method
// because we have consumers that start dynamically.
$ consumer -> waitForAll ();
// Close the connection with RabbitMQ server.
$ consumer -> disconnect (); // Advanced RPC Client Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ClientEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC client endpoint from the client.
/** @var MAKSAmqpAgentRPCClientEndpoint */
$ rpcClient = $ client -> getClientEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcClient->on() and $rpcClient->getEvents() methods for more info.
$ rpcClient
-> on ( ' connection.after.open ' , function ( $ connection , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is now connected! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ connection instanceof AMQPStreamConnection) {
printf ( ' The connection has currently %d channel(s). ' , count ( $ connection -> channels ) - 1 );
}
})-> on ( ' request.before.send ' , function ( $ request , $ rpcClient , $ eventName ) {
printf ( ' %s has emitted [%s] event and is about to send a request! ' , get_class ( $ rpcClient ), $ eventName );
if ( $ request instanceof AMQPMessage) {
$ request -> set ( ' content_type ' , ' application/json ' )
printf ( ' The request content_type header has been set to: %s ' , $ request -> get ( ' content_type ' ));
}
});
// Optionally, you can ping the RabbitMQ server to see if a connection can be established.
$ roundtrip = $ rpcClient -> ping ();
$ rpcClient -> connect ();
$ response = $ rpcClient -> request ( ' {"command":"some-command","parameter":"some-parameter"} ' );
$ rpcClient -> disconnect (); // Advanced RPC Server Demo
use MAKS AmqpAgent Client ;
use MAKS AmqpAgent Config ;
use MAKS AmqpAgent RPC ServerEndpoint ;
$ config = new Config ();
$ client = new Client ( $ config );
// Retrieving an RPC server from the client.
/** @var MAKSAmqpAgentRPCServerEndpoint */
$ rpcServer = $ client -> getServerEndpoint ();
// Attaching some additional functionality based on events emitted by the endpoint.
// See $rpcServer->on() and $rpcServer->getEvents() methods for more info.
$ rpcServer
-> on ( ' request.on.get ' , function ( $ request , $ rpcServer , $ eventName ) {
printf ( ' %s has emitted [%s] event and has just got a request! ' , get_class ( $ rpcServer ), $ eventName );
if ( $ request instanceof AMQPMessage) {
printf ( ' The request has the following body: %s ' , $ request -> body ;
}
});
$ rpcServer -> connect ();
$ request = $ rpcServer -> respond ( ' YourNamespaceYourClass::yourCallback ' );
$ rpcServer -> disconnect ();Fato: você pode tornar o código nos exemplos avançados do Publisher/Consumer muito mais fácil se fizer alterações de todos os parâmetros em um arquivo de configuração e passá -lo para o cliente em vez do padrão.
Conselho: A base de código do agente AMQP está bem documentada, consulte este link para examinar todas as classes e métodos.
O agente AMQP é um pacote de código aberto licenciado no GNU LGPL v2.1 devido à licença php-amqplib.
Copyright (C) 2020 Marwan al-Soltany. Todos os direitos reservados.