|
<?php |
|
|
|
declare(strict_types=1); |
|
|
|
// arguments |
|
$partition = (int) ($argv[1] ?? 0); |
|
$batchSize = (int) ($argv[2] ?? 1000); |
|
printf("Consume from partition: %d\n", $partition); |
|
|
|
// configure |
|
$conf = new RdKafka\Conf(); |
|
$conf->set('group.id', 'my_customer_group_id'); |
|
$conf->set('metadata.broker.list', 'kafka'); //docker container name |
|
$conf->set('auto.offset.reset', 'earliest'); |
|
$conf->set('enable.partition.eof', 'true'); |
|
$conf->set('enable.auto.offset.store', 'false'); |
|
|
|
$consumer = new RdKafka\Consumer($conf); |
|
|
|
$topic = $consumer->newTopic('my_topic_name'); |
|
|
|
// retrieve stored offset |
|
$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED); |
|
$currentMessage = $topic->consume($partition, 10_000); |
|
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $currentMessage->err) { |
|
printf("Message consume error: %d\n", $currentMessage->err); |
|
exit; |
|
} |
|
$currentMessageOffset = $currentMessage->offset; |
|
unset($currentMessage); |
|
printf("Current message offset: %d\n", $currentMessageOffset); |
|
$topic->consumeStop($partition); |
|
|
|
// retrieve last offset |
|
$topic->consumeStart($partition, rd_kafka_offset_tail(1)); |
|
$lastMessage = $topic->consume($partition, 10_000); |
|
$lastMessageOffset = $lastMessage->offset; |
|
unset($lastMessage); |
|
printf("Last message offset: %d\n", $lastMessageOffset); |
|
$topic->consumeStop($partition); |
|
|
|
$messagesBehindCnt = $lastMessageOffset - $currentMessageOffset; |
|
printf("Messages waiting for consumption: \033[32m%d\033[0m\n", $messagesBehindCnt); |
|
if ($messagesBehindCnt <= $batchSize) { |
|
printf("\033[31mNot enough messages for batch: %d\033[0m\n", $messagesBehindCnt); |
|
exit; |
|
} |
|
|
|
// consume batch |
|
$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED); |
|
$messagesList = $topic->consumeBatch($partition, 10_000, $batchSize); |
|
if ($messagesList) { |
|
$lastMessage = end($messagesList); |
|
printf("Commit offset: \033[32m%d\033[0m\n", $lastMessage->offset); |
|
$topic->offsetStore($partition, $lastMessage->offset); |
|
} |
|
$topic->consumeStop($partition); |