PhpKafka
0.2.4
作曲家の依存関係として追加:
composer require simpod/kafka一部の構成定数は、 ConsumerConfig 、 ProducerConfig 、またはCommonClientConfigsのように提供されます。
ただし、それらはJava APIからコピーされており、すべてがLibrdkafkaに適用できるわけではありません。使用する前に、Librdkafkaのドキュメントに相談してください。
KafkaConsumerボイラープレートは、 startBatch()メソッド(Librdkafkaでこの例を抑えるために)およびstart()で利用できます。また、終了信号も処理します。
<?php
declare (strict_types= 1 );
namespace Your AppNamespace ;
use RdKafka Message ;
use SimPod Kafka Clients Consumer ConsumerConfig ;
use SimPod Kafka Clients Consumer KafkaConsumer ;
final class ExampleConsumer
{
public function run (): void
{
$ kafkaConsumer = new KafkaConsumer ( $ this -> getConfig (), Logger:: get ());
$ kafkaConsumer -> subscribe ([ ' topic1 ' ]);
$ kafkaConsumer -> start (
120 * 1000 ,
static function ( Message $ message ) use ( $ kafkaConsumer ) : void {
// Process message here
$ kafkaConsumer -> commit ( $ message ); // Autocommit is disabled
}
);
}
private function getConfig (): ConsumerConfig
{
$ config = new ConsumerConfig ();
$ config -> set (ConsumerConfig:: BOOTSTRAP_SERVERS_CONFIG , ' 127.0.0.1:9092 ' );
$ config -> set (ConsumerConfig:: ENABLE_AUTO_COMMIT_CONFIG , false );
$ config -> set (ConsumerConfig:: CLIENT_ID_CONFIG , gethostname ());
$ config -> set (ConsumerConfig:: AUTO_OFFSET_RESET_CONFIG , ' earliest ' );
$ config -> set (ConsumerConfig:: GROUP_ID_CONFIG , ' consumer_group_name ' );
return $ config ;
}
} <?php
declare (strict_types= 1 );
namespace Your AppNamespace ;
use RdKafka Message ;
use SimPod Kafka Clients Consumer ConsumerConfig ;
use SimPod Kafka Clients Consumer ConsumerRecords ;
use SimPod Kafka Clients Consumer KafkaConsumer ;
final class ExampleBatchConsumer
{
public function run (): void
{
$ kafkaConsumer = new KafkaConsumer ( $ this -> getConfig ());
$ kafkaConsumer -> subscribe ([ ' topic1 ' ]);
$ kafkaConsumer -> startBatch (
200000 ,
120 * 1000 ,
static function ( Message $ message ): void {
// Process record
},
static function ( ConsumerRecords $ consumerRecords ) use ( $ kafkaConsumer ) : void {
// Process records batch
$ kafkaConsumer -> commit ( $ consumerRecords -> getLast ());
}
);
}
private function getConfig (): ConsumerConfig
{
$ config = new ConsumerConfig ();
$ config -> set (ConsumerConfig:: BOOTSTRAP_SERVERS_CONFIG , ' 127.0.0.1:9092 ' );
$ config -> set (ConsumerConfig:: ENABLE_AUTO_COMMIT_CONFIG , false );
$ config -> set (ConsumerConfig:: CLIENT_ID_CONFIG , gethostname ());
$ config -> set (ConsumerConfig:: AUTO_OFFSET_RESET_CONFIG , ' earliest ' );
$ config -> set (ConsumerConfig:: GROUP_ID_CONFIG , ' consumer_group_name ' );
return $ config ;
}
}