PHP-rdkafka 是一个基于 librdkafka 的稳定、可用于生产且快速的PHP Kafka 客户端。
当前版本支持 PHP >= 8.1.0、librdkafka >= 1.5.3、Kafka >= 0.8。版本 6.x 支持 PHP 7.x..8.x、librdkafka 0.11..2.x。旧版本支持 PHP 5。
扩展的目标是成为一个专注于生产和长期支持的低级非固定 librdkafka 绑定。
支持高级和低级消费者、生产者和元数据API。
文档可在此处获取。
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
下面使用的配置参数可以在Librdkafka配置参考中找到
对于生产,我们首先需要创建一个生产者,并向其中添加代理(Kafka 服务器):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );警告确保您的生产者遵循正确的关闭(见下文)以免丢失消息。
接下来,我们从生产者创建一个主题实例:
<?php
$ topic = $ rk -> newTopic ( " test " );从那里,我们可以使用 Produce 方法生成任意数量的消息:
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );第一个参数是分区。 RD_KAFKA_PARTITION_UA 代表unsigned ,让 librdkafka 选择分区。
第二个参数是消息标志,应该是 0
或RD_KAFKA_MSG_F_BLOCK以阻止满队列上的生产。消息有效负载可以是任何内容。
这应该在销毁生产者实例之前完成
确保所有排队和正在进行的生产请求均已完成
在终止之前。为$timeout_ms使用合理的值。
警告不调用flush可能会导致消息丢失!
$ rk -> flush ( $ timeout_ms );如果您不关心发送尚未发送的消息,可以在调用flush() purge() :
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );RdKafkaKafkaConsumer 类支持自动分区分配/撤销。请参阅此处的示例。
注意低级别消费者是旧版 API,请优先使用高级消费者
我们首先需要创建一个低级消费者,并向其中添加代理(Kafka 服务器):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );接下来,通过调用newTopic()方法创建一个主题实例,并开始在分区 0 上消费:
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );接下来,检索消费的消息:
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}注意低级别消费者是旧版 API,请优先使用高级消费者
从多个主题和/或分区消费可以通过告诉 librdkafka 将所有消息从这些主题/分区转发到内部队列,然后从该队列消费来完成:
创建队列:
<?php
$ queue = $ rk -> newQueue ();将主题分区添加到队列中:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );接下来,从队列中检索已消费的消息:
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}librdkafka 默认在代理上存储偏移量。
如果您使用本地文件进行偏移存储,则默认情况下该文件会在当前目录中创建,其名称基于主题和分区。可以通过设置offset.store.path配置属性来更改目录。
要手动控制偏移量,请将enable.auto.offset.store设置为false 。
设置auto.commit.interval.ms和auto.commit.enable将控制
存储的偏移量是否将自动提交给代理以及在哪个时间间隔。
要手动控制偏移量,请将enable.auto.commit设置为false 。
为高级消费者消费消息的调用之间允许的最大时间。
如果超过此间隔,则消费者被视为失败,并且该组将
重新平衡,以便将分区重新分配给另一个消费者组成员。
group.id负责设置您的消费者组 ID,它应该是唯一的(并且不应更改)。 Kafka 使用它来识别应用程序并为其存储偏移量。
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );Librdkafka 配置参考
默认情况下,librdkafka 将为每个使用的分区缓冲最多 1GB 的消息。您可以通过减少使用者上的queued.max.messages.kbytes参数的值来降低内存使用量。
每个消费者和生产者实例将按照topic.metadata.refresh.interval.ms参数定义的时间间隔获取主题元数据。根据您的 librdkafka 版本,该参数默认为 10 秒或 600 秒。
librdkafka 默认获取集群所有主题的元数据。将topic.metadata.refresh.sparse设置为字符串"true"可确保 librdkafka 仅获取他使用的主题。
将topic.metadata.refresh.sparse设置为"true" ,并将topic.metadata.refresh.interval.ms设置为 600 秒(加上一些抖动)可以大量减少带宽,具体取决于消费者和主题的数量。
此设置允许 librdkafka 线程在 librdkafka 处理完毕后立即终止。这有效地允许您的 PHP 进程/请求快速终止。
启用此功能时,您必须像这样屏蔽信号:
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );代理套接字操作可能阻塞的最长时间。较低的值可提高响应能力,但代价是 CPU 使用率稍高。
减小该设置的值可提高关闭速度。该值定义 librdkafka 在读循环的一次迭代中阻塞的最长时间。这也定义了 librdkafka 主线程检查终止的频率。
这定义了 librdkafka 在发送一批消息之前等待的最大时间和默认时间。将此设置减少到例如 1ms 可确保消息尽快发送,而不是批量发送。
这已被证明可以减少 rdkafka 实例以及 PHP 进程/请求的关闭时间。
这是针对低延迟进行优化的配置。这允许 PHP 进程/请求尽快发送消息并快速终止。
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );建议定期调用 poll 来提供回调服务。在php-rdkafka:3.x中
poll 在关闭期间也会被调用,因此不定期调用它可能会
导致停机时间稍长一些。下面的示例进行轮询,直到队列中不再有事件为止:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
文档的来源可以在这里找到
如果文档还不够,请随时在 Gitter 或 Google Groups 上的 php-rdkafka 频道上提问。
因为您的 IDE 无法自动发现 php-rdkadka api,您可以考虑使用外部包,为 php-rdkafka 类、函数和常量提供一组存根:kwn/php-rdkafka-stubs
如果您愿意贡献,谢谢:)
在开始之前,请查看贡献文档,了解如何合并您的更改。
从 librdkafka 复制的文档。
作者:参见贡献者。
php-rdkafka 在 MIT 许可证下发布。