PHP-rdkafka 是一個基於 librdkafka 的穩定、可用於生產且快速的PHP Kafka 用戶端。
目前版本支援 PHP >= 8.1.0、librdkafka >= 1.5.3、Kafka >= 0.8。版本 6.x 支援 PHP 7.x..8.x、librdkafka 0.11..2.x。舊版支援 PHP 5。
擴展的目標是成為一個專注於生產和長期支援的低階非固定 librdkafka 綁定。
支援高級和低階消費者、生產者和元資料API。
文件可在此處取得。
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html
下面使用的設定參數可以在Librdkafka配置參考中找到
對於生產,我們首先需要創建一個生產者,並在其中添加代理(Kafka 伺服器):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Producer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1:9092,10.0.0.2:9092 " );警告確保您的生產者遵循正確的關閉(見下文)以免丟失訊息。
接下來,我們從生產者建立一個主題實例:
<?php
$ topic = $ rk -> newTopic ( " test " );從那裡,我們可以使用 Produce 方法產生任意數量的消息:
<?php
$ topic -> produce ( RD_KAFKA_PARTITION_UA , 0 , " Message payload " );第一個參數是分區。 RD_KAFKA_PARTITION_UA 代表unsigned ,讓 librdkafka 選擇分割區。
第二個參數是訊息標誌,應該是 0
或RD_KAFKA_MSG_F_BLOCK以阻止滿隊列上的生產。訊息有效負載可以是任何內容。
這應該在銷毀生產者實例之前完成
確保所有排隊和進行中的產品請求均已完成
在終止之前。為$timeout_ms使用合理的值。
警告不呼叫flush可能會導致訊息遺失!
$ rk -> flush ( $ timeout_ms );如果您不關心發送尚未發送的訊息,可以在呼叫flush() purge() :
// Forget messages that are not fully sent yet
$ rk -> purge ( RD_KAFKA_PURGE_F_QUEUE );
$ rk -> flush ( $ timeout_ms );RdKafkaKafkaConsumer 類別支援自動分區分配/撤銷。請參閱此處的範例。
注意低階消費者是舊版 API,請優先使用高階消費者
我們首先需要創建一個低階的消費者,並在其中加入代理(Kafka 伺服器):
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' log_level ' , ( string ) LOG_DEBUG );
$ conf -> set ( ' debug ' , ' all ' );
$ rk = new RdKafka Consumer ( $ conf );
$ rk -> addBrokers ( " 10.0.0.1,10.0.0.2 " );接下來,透過呼叫newTopic()方法來建立主題實例,並開始在分區 0 上消費:
<?php
$ topic = $ rk -> newTopic ( " test " );
// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_BEGINNING );接下來,檢索消費的訊息:
<?php
while ( true ) {
// The first argument is the partition (again).
// The second argument is the timeout.
$ msg = $ topic -> consume ( 0 , 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}注意低階消費者是舊版 API,請優先使用高階消費者
從多個主題和/或分區消費可以透過告訴 librdkafka 將所有訊息從這些主題/分區轉發到內部佇列,然後從該佇列消費來完成:
建立隊列:
<?php
$ queue = $ rk -> newQueue ();將主題分區加入佇列:
<?php
$ topic1 = $ rk -> newTopic ( " topic1 " );
$ topic1 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic1 -> consumeQueueStart ( 1 , RD_KAFKA_OFFSET_BEGINNING , $ queue );
$ topic2 = $ rk -> newTopic ( " topic2 " );
$ topic2 -> consumeQueueStart ( 0 , RD_KAFKA_OFFSET_BEGINNING , $ queue );接下來,從佇列中檢索已消費的訊息:
<?php
while ( true ) {
// The only argument is the timeout.
$ msg = $ queue -> consume ( 1000 );
if ( null === $ msg || $ msg -> err === RD_KAFKA_RESP_ERR__PARTITION_EOF ) {
// Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
continue ;
} elseif ( $ msg -> err ) {
echo $ msg -> errstr (), "n" ;
break ;
} else {
echo $ msg -> payload , "n" ;
}
}librdkafka 預設在代理程式上儲存偏移量。
如果您使用本機檔案進行偏移存儲,則預設情況下該檔案會在目前目錄中創建,其名稱基於主題和分割區。可以透過設定offset.store.path配置屬性來變更目錄。
若要手動控制偏移量,請將enable.auto.offset.store設定為false 。
設定auto.commit.interval.ms和auto.commit.enable將控制
儲存的偏移量是否將自動提交給代理以及在哪個時間間隔。
若要手動控制偏移量,請將enable.auto.commit設定為false 。
為高階消費者消費訊息的呼叫之間允許的最大時間。
如果超過此間隔,則消費者被視為失敗,並且該組將
重新平衡,以便將分區重新分配給另一個消費者群組成員。
group.id負責設定您的消費者群組 ID,它應該是唯一的(並且不應更改)。 Kafka 使用它來識別應用程式並為其儲存偏移量。
<?php
$ topicConf = new RdKafka TopicConf ();
$ topicConf -> set ( " auto.commit.interval.ms " , 1e3 );
$ topic = $ rk -> newTopic ( " test " , $ topicConf );
$ topic -> consumeStart ( 0 , RD_KAFKA_OFFSET_STORED );Librdkafka 配置參考
預設情況下,librdkafka 將為每個使用的分割區緩衝最多 1GB 的訊息。您可以透過減少使用者的queued.max.messages.kbytes參數的值來降低記憶體使用量。
每個消費者和生產者實例將依照topic.metadata.refresh.interval.ms參數定義的時間間隔取得主題元資料。根據您的 librdkafka 版本,該參數預設為 10 秒或 600 秒。
librdkafka 預設會取得叢集所有主題的元資料。將topic.metadata.refresh.sparse設為字串"true"可確保 librdkafka 僅取得他使用的主題。
將topic.metadata.refresh.sparse設為"true" ,並將topic.metadata.refresh.interval.ms設為 600 秒(加上一些抖動)可以大量減少頻寬,具體取決於消費者和主題的數量。
此設定允許 librdkafka 執行緒在 librdkafka 處理完畢後立即終止。這有效地允許您的 PHP 進程/請求快速終止。
啟用此功能時,您必須像這樣屏蔽訊號:
<?php
// once
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
// any time
$ conf -> set ( ' internal.termination.signal ' , SIGIO );代理套接字操作可能阻塞的最長時間。較低的值可以提高反應能力,但代價是 CPU 使用率稍高。
減小該設定的值可提高關閉速度。該值定義 librdkafka 在讀取循環的一次迭代中阻塞的最長時間。這也定義了 librdkafka 主執行緒檢查終止的頻率。
這定義了 librdkafka 在發送一批訊息之前等待的最大時間和預設時間。將此設定減少到例如 1ms 可確保訊息盡快發送,而不是大量發送。
這已被證明可以減少 rdkafka 實例以及 PHP 進程/請求的關閉時間。
這是針對低延遲進行最佳化的配置。這允許 PHP 進程/請求盡快發送訊息並快速終止。
<?php
$ conf = new RdKafka Conf ();
$ conf -> set ( ' socket.timeout.ms ' , 50 ); // or socket.blocking.max.ms, depending on librdkafka version
if ( function_exists ( ' pcntl_sigprocmask ' )) {
pcntl_sigprocmask ( SIG_BLOCK , array ( SIGIO ));
$ conf -> set ( ' internal.termination.signal ' , SIGIO );
} else {
$ conf -> set ( ' queue.buffering.max.ms ' , 1 );
}
$ producer = new RdKafka Producer ( $ conf );
$ consumer = new RdKafka Consumer ( $ conf );建議定期呼叫 poll 來提供回呼服務。在php-rdkafka:3.x中
poll 在關閉期間也會被調用,因此不定期調用它可能會
導致停機時間稍長。下面的範例進行輪詢,直到佇列中不再有事件為止:
$producer->produce(...);
while ($producer->getOutQLen() > 0) {
$producer->poll(1);
}
https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
文件的來源可以在這裡找到
如果文件還不夠,請隨時在 Gitter 或 Google Groups 上的 php-rdkafka 頻道上提問。
因為您的 IDE 無法自動發現 php-rdkadka api,您可以考慮使用外部套件,為 php-rdkafka 類別、函數和常數提供一組存根:kwn/php-rdkafka-stubs
如果您願意貢獻,謝謝:)
在開始之前,請查看貢獻文檔,以了解如何合併您的變更。
從 librdkafka 複製的文檔。
作者:參見貢獻者。
php-rdkafka 在 MIT 許可證下發布。