PhpKafka
0.2.4
Tambahkan sebagai ketergantungan komposer:
composer require simpod/kafka Beberapa konstanta konfigurasi disediakan seperti ConsumerConfig , ProducerConfig atau CommonClientConfigs .
Namun, mereka disalin dari Java API dan tidak semua berlaku untuk Libldkafka. Konsultasikan dengan dokumentasi LibrDKafka sebelum digunakan.
Boilerplate KafkaConsumer tersedia dengan metode startBatch() (untuk menggantikan contoh ini di libtkafka) dan dengan start() . Mereka juga menangani sinyal terminasi untuk Anda.
<?php
declare (strict_types= 1 );
namespace Your AppNamespace ;
use RdKafka Message ;
use SimPod Kafka Clients Consumer ConsumerConfig ;
use SimPod Kafka Clients Consumer KafkaConsumer ;
final class ExampleConsumer
{
public function run (): void
{
$ kafkaConsumer = new KafkaConsumer ( $ this -> getConfig (), Logger:: get ());
$ kafkaConsumer -> subscribe ([ ' topic1 ' ]);
$ kafkaConsumer -> start (
120 * 1000 ,
static function ( Message $ message ) use ( $ kafkaConsumer ) : void {
// Process message here
$ kafkaConsumer -> commit ( $ message ); // Autocommit is disabled
}
);
}
private function getConfig (): ConsumerConfig
{
$ config = new ConsumerConfig ();
$ config -> set (ConsumerConfig:: BOOTSTRAP_SERVERS_CONFIG , ' 127.0.0.1:9092 ' );
$ config -> set (ConsumerConfig:: ENABLE_AUTO_COMMIT_CONFIG , false );
$ config -> set (ConsumerConfig:: CLIENT_ID_CONFIG , gethostname ());
$ config -> set (ConsumerConfig:: AUTO_OFFSET_RESET_CONFIG , ' earliest ' );
$ config -> set (ConsumerConfig:: GROUP_ID_CONFIG , ' consumer_group_name ' );
return $ config ;
}
} <?php
declare (strict_types= 1 );
namespace Your AppNamespace ;
use RdKafka Message ;
use SimPod Kafka Clients Consumer ConsumerConfig ;
use SimPod Kafka Clients Consumer ConsumerRecords ;
use SimPod Kafka Clients Consumer KafkaConsumer ;
final class ExampleBatchConsumer
{
public function run (): void
{
$ kafkaConsumer = new KafkaConsumer ( $ this -> getConfig ());
$ kafkaConsumer -> subscribe ([ ' topic1 ' ]);
$ kafkaConsumer -> startBatch (
200000 ,
120 * 1000 ,
static function ( Message $ message ): void {
// Process record
},
static function ( ConsumerRecords $ consumerRecords ) use ( $ kafkaConsumer ) : void {
// Process records batch
$ kafkaConsumer -> commit ( $ consumerRecords -> getLast ());
}
);
}
private function getConfig (): ConsumerConfig
{
$ config = new ConsumerConfig ();
$ config -> set (ConsumerConfig:: BOOTSTRAP_SERVERS_CONFIG , ' 127.0.0.1:9092 ' );
$ config -> set (ConsumerConfig:: ENABLE_AUTO_COMMIT_CONFIG , false );
$ config -> set (ConsumerConfig:: CLIENT_ID_CONFIG , gethostname ());
$ config -> set (ConsumerConfig:: AUTO_OFFSET_RESET_CONFIG , ' earliest ' );
$ config -> set (ConsumerConfig:: GROUP_ID_CONFIG , ' consumer_group_name ' );
return $ config ;
}
}