PhpKafka
0.2.4
Fügen Sie als Komponistenabhängigkeit hinzu:
composer require simpod/kafka Einige Konfigurationskonstanten werden wie ConsumerConfig , ProducerConfig oder CommonClientConfigs bereitgestellt.
Sie werden jedoch von der Java -API kopiert und nicht alle sind für librdkafka anwendbar. Wenden Sie sich vor dem Gebrauch mit Librdkafka -Dokumentation.
KafkaConsumer Boilerplate ist mit startBatch() -Methode (um dieses Beispiel in librdkafka) und mit start() erhältlich zu sein. Sie verarbeiten auch Terminierungssignale für Sie.
<?php
declare (strict_types= 1 );
namespace Your AppNamespace ;
use RdKafka Message ;
use SimPod Kafka Clients Consumer ConsumerConfig ;
use SimPod Kafka Clients Consumer KafkaConsumer ;
final class ExampleConsumer
{
public function run (): void
{
$ kafkaConsumer = new KafkaConsumer ( $ this -> getConfig (), Logger:: get ());
$ kafkaConsumer -> subscribe ([ ' topic1 ' ]);
$ kafkaConsumer -> start (
120 * 1000 ,
static function ( Message $ message ) use ( $ kafkaConsumer ) : void {
// Process message here
$ kafkaConsumer -> commit ( $ message ); // Autocommit is disabled
}
);
}
private function getConfig (): ConsumerConfig
{
$ config = new ConsumerConfig ();
$ config -> set (ConsumerConfig:: BOOTSTRAP_SERVERS_CONFIG , ' 127.0.0.1:9092 ' );
$ config -> set (ConsumerConfig:: ENABLE_AUTO_COMMIT_CONFIG , false );
$ config -> set (ConsumerConfig:: CLIENT_ID_CONFIG , gethostname ());
$ config -> set (ConsumerConfig:: AUTO_OFFSET_RESET_CONFIG , ' earliest ' );
$ config -> set (ConsumerConfig:: GROUP_ID_CONFIG , ' consumer_group_name ' );
return $ config ;
}
} <?php
declare (strict_types= 1 );
namespace Your AppNamespace ;
use RdKafka Message ;
use SimPod Kafka Clients Consumer ConsumerConfig ;
use SimPod Kafka Clients Consumer ConsumerRecords ;
use SimPod Kafka Clients Consumer KafkaConsumer ;
final class ExampleBatchConsumer
{
public function run (): void
{
$ kafkaConsumer = new KafkaConsumer ( $ this -> getConfig ());
$ kafkaConsumer -> subscribe ([ ' topic1 ' ]);
$ kafkaConsumer -> startBatch (
200000 ,
120 * 1000 ,
static function ( Message $ message ): void {
// Process record
},
static function ( ConsumerRecords $ consumerRecords ) use ( $ kafkaConsumer ) : void {
// Process records batch
$ kafkaConsumer -> commit ( $ consumerRecords -> getLast ());
}
);
}
private function getConfig (): ConsumerConfig
{
$ config = new ConsumerConfig ();
$ config -> set (ConsumerConfig:: BOOTSTRAP_SERVERS_CONFIG , ' 127.0.0.1:9092 ' );
$ config -> set (ConsumerConfig:: ENABLE_AUTO_COMMIT_CONFIG , false );
$ config -> set (ConsumerConfig:: CLIENT_ID_CONFIG , gethostname ());
$ config -> set (ConsumerConfig:: AUTO_OFFSET_RESET_CONFIG , ' earliest ' );
$ config -> set (ConsumerConfig:: GROUP_ID_CONFIG , ' consumer_group_name ' );
return $ config ;
}
}