PhpKafka
0.2.4
작곡가 종속성으로 추가 :
composer require simpod/kafka ConsumerConfig , ProducerConfig 또는 CommonClientConfigs 와 같은 일부 구성 상수가 제공됩니다.
그러나 Java API에서 복사되며 Librdkafka에 모두 적용되는 것은 아닙니다. 사용하기 전에 Librdkafka 문서를 참조하십시오.
KafkaConsumer 보일러 플레이트는 startBatch() 메소드 (Librdkafka 에서이 예제)와 start() 와 함께 사용할 수 있습니다. 그들은 또한 당신을 위해 종료 신호를 처리합니다.
<?php
declare (strict_types= 1 );
namespace Your AppNamespace ;
use RdKafka Message ;
use SimPod Kafka Clients Consumer ConsumerConfig ;
use SimPod Kafka Clients Consumer KafkaConsumer ;
final class ExampleConsumer
{
public function run (): void
{
$ kafkaConsumer = new KafkaConsumer ( $ this -> getConfig (), Logger:: get ());
$ kafkaConsumer -> subscribe ([ ' topic1 ' ]);
$ kafkaConsumer -> start (
120 * 1000 ,
static function ( Message $ message ) use ( $ kafkaConsumer ) : void {
// Process message here
$ kafkaConsumer -> commit ( $ message ); // Autocommit is disabled
}
);
}
private function getConfig (): ConsumerConfig
{
$ config = new ConsumerConfig ();
$ config -> set (ConsumerConfig:: BOOTSTRAP_SERVERS_CONFIG , ' 127.0.0.1:9092 ' );
$ config -> set (ConsumerConfig:: ENABLE_AUTO_COMMIT_CONFIG , false );
$ config -> set (ConsumerConfig:: CLIENT_ID_CONFIG , gethostname ());
$ config -> set (ConsumerConfig:: AUTO_OFFSET_RESET_CONFIG , ' earliest ' );
$ config -> set (ConsumerConfig:: GROUP_ID_CONFIG , ' consumer_group_name ' );
return $ config ;
}
} <?php
declare (strict_types= 1 );
namespace Your AppNamespace ;
use RdKafka Message ;
use SimPod Kafka Clients Consumer ConsumerConfig ;
use SimPod Kafka Clients Consumer ConsumerRecords ;
use SimPod Kafka Clients Consumer KafkaConsumer ;
final class ExampleBatchConsumer
{
public function run (): void
{
$ kafkaConsumer = new KafkaConsumer ( $ this -> getConfig ());
$ kafkaConsumer -> subscribe ([ ' topic1 ' ]);
$ kafkaConsumer -> startBatch (
200000 ,
120 * 1000 ,
static function ( Message $ message ): void {
// Process record
},
static function ( ConsumerRecords $ consumerRecords ) use ( $ kafkaConsumer ) : void {
// Process records batch
$ kafkaConsumer -> commit ( $ consumerRecords -> getLast ());
}
);
}
private function getConfig (): ConsumerConfig
{
$ config = new ConsumerConfig ();
$ config -> set (ConsumerConfig:: BOOTSTRAP_SERVERS_CONFIG , ' 127.0.0.1:9092 ' );
$ config -> set (ConsumerConfig:: ENABLE_AUTO_COMMIT_CONFIG , false );
$ config -> set (ConsumerConfig:: CLIENT_ID_CONFIG , gethostname ());
$ config -> set (ConsumerConfig:: AUTO_OFFSET_RESET_CONFIG , ' earliest ' );
$ config -> set (ConsumerConfig:: GROUP_ID_CONFIG , ' consumer_group_name ' );
return $ config ;
}
}