PhpKafka
0.2.4
เพิ่มเป็นนักแต่งเพลงขึ้นอยู่กับ:
composer require simpod/kafka ค่าคงที่การกำหนดค่าบางอย่างมีให้เช่น ConsumerConfig , ProducerConfig หรือ CommonClientConfigs
อย่างไรก็ตามพวกเขาถูกคัดลอกมาจาก Java API และไม่สามารถใช้ได้กับ Liblkafka ปรึกษากับเอกสาร Liblkafka ก่อนการใช้งาน
KafkaConsumer Boilerplate สามารถใช้ได้กับวิธี startBatch() (เพื่อลดตัวอย่างนี้ใน libldkafka) และด้วย start() พวกเขายังจัดการสัญญาณการเลิกจ้างให้คุณ
<?php
declare (strict_types= 1 );
namespace Your AppNamespace ;
use RdKafka Message ;
use SimPod Kafka Clients Consumer ConsumerConfig ;
use SimPod Kafka Clients Consumer KafkaConsumer ;
final class ExampleConsumer
{
public function run (): void
{
$ kafkaConsumer = new KafkaConsumer ( $ this -> getConfig (), Logger:: get ());
$ kafkaConsumer -> subscribe ([ ' topic1 ' ]);
$ kafkaConsumer -> start (
120 * 1000 ,
static function ( Message $ message ) use ( $ kafkaConsumer ) : void {
// Process message here
$ kafkaConsumer -> commit ( $ message ); // Autocommit is disabled
}
);
}
private function getConfig (): ConsumerConfig
{
$ config = new ConsumerConfig ();
$ config -> set (ConsumerConfig:: BOOTSTRAP_SERVERS_CONFIG , ' 127.0.0.1:9092 ' );
$ config -> set (ConsumerConfig:: ENABLE_AUTO_COMMIT_CONFIG , false );
$ config -> set (ConsumerConfig:: CLIENT_ID_CONFIG , gethostname ());
$ config -> set (ConsumerConfig:: AUTO_OFFSET_RESET_CONFIG , ' earliest ' );
$ config -> set (ConsumerConfig:: GROUP_ID_CONFIG , ' consumer_group_name ' );
return $ config ;
}
} <?php
declare (strict_types= 1 );
namespace Your AppNamespace ;
use RdKafka Message ;
use SimPod Kafka Clients Consumer ConsumerConfig ;
use SimPod Kafka Clients Consumer ConsumerRecords ;
use SimPod Kafka Clients Consumer KafkaConsumer ;
final class ExampleBatchConsumer
{
public function run (): void
{
$ kafkaConsumer = new KafkaConsumer ( $ this -> getConfig ());
$ kafkaConsumer -> subscribe ([ ' topic1 ' ]);
$ kafkaConsumer -> startBatch (
200000 ,
120 * 1000 ,
static function ( Message $ message ): void {
// Process record
},
static function ( ConsumerRecords $ consumerRecords ) use ( $ kafkaConsumer ) : void {
// Process records batch
$ kafkaConsumer -> commit ( $ consumerRecords -> getLast ());
}
);
}
private function getConfig (): ConsumerConfig
{
$ config = new ConsumerConfig ();
$ config -> set (ConsumerConfig:: BOOTSTRAP_SERVERS_CONFIG , ' 127.0.0.1:9092 ' );
$ config -> set (ConsumerConfig:: ENABLE_AUTO_COMMIT_CONFIG , false );
$ config -> set (ConsumerConfig:: CLIENT_ID_CONFIG , gethostname ());
$ config -> set (ConsumerConfig:: AUTO_OFFSET_RESET_CONFIG , ' earliest ' );
$ config -> set (ConsumerConfig:: GROUP_ID_CONFIG , ' consumer_group_name ' );
return $ config ;
}
}