-
-
Save kolinko-ant/5ebede9d06ca13c113b8fbab7e3a5e9f to your computer and use it in GitHub Desktop.
PHP rdkafka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "blabla", | |
"type": "project", | |
"config": { | |
"preferred-install": { | |
"*": "dist" | |
}, | |
"sort-packages": true | |
}, | |
"require": { | |
"php": "^7.1.3", | |
"ext-rdkafka": "*", | |
"symfony/console": "^4.0", | |
"symfony/framework-bundle": "^4.0" | |
}, | |
"require-dev": { | |
"kwn/php-rdkafka-stubs": "^1.0" | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types = 1); | |
namespace App\Kafka; | |
use RdKafka; | |
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; | |
use Symfony\Component\Console\Input\InputArgument; | |
use Symfony\Component\Console\Input\InputInterface; | |
use Symfony\Component\Console\Input\InputOption; | |
use Symfony\Component\Console\Output\OutputInterface; | |
class ConsumeCommand extends ContainerAwareCommand | |
{ | |
private const CONSUME_MAX_WAIT_TIME = 15 * 1000; | |
private const CONF_COMPRESSION_CODEC = 'gzip'; | |
private const CONF_GROUP_ID = 'basic-lowLevel-consumer'; | |
private const TOPIC_CONF_AUTO_COMMIT_INTERVAL = '5000'; | |
private const TOPIC_CONF_OFFSET_STORE_METHOD = 'file'; // file|broker | |
private const TOPIC_CONF_OFFSET_RESET = 'smallest'; // smallest|largest | |
/** @var string */ | |
private $brokersList; | |
/** @var int */ | |
private $logLevel; | |
public function __construct(string $brokersList, int $logLevel) | |
{ | |
parent::__construct(); | |
$this->brokersList = $brokersList; | |
$this->logLevel = $logLevel; | |
} | |
protected function configure() | |
{ | |
$this | |
->setName('app:kafka:consume') | |
->setDescription('Low-level consumer of Kafka topic') | |
->addArgument('topic', InputArgument::REQUIRED, 'Name of the topic to consume') | |
->addOption('offset', 'o', InputOption::VALUE_OPTIONAL) | |
->addOption('partition', 'p', InputOption::VALUE_OPTIONAL); | |
} | |
protected function execute(InputInterface $input, OutputInterface $output) | |
{ | |
$topicName = $input->getArgument('topic'); | |
$offset = (int) $input->getOption('offset') ?? RD_KAFKA_OFFSET_BEGINNING; | |
$partition = (int) $input->getOption('partition') ?? RD_KAFKA_PARTITION_UA; | |
$output->writeln("Starting consuming topic '${topicName}' from offset ${offset}, partition ${partition}"); | |
$conf = $this->getConf(); | |
$rk = new RdKafka\Consumer($conf); | |
$rk->setLogLevel($this->logLevel); | |
$rk->addBrokers($this->brokersList); | |
$topicConf = $this->getTopicConf(); | |
$topic = $rk->newTopic($topicName, $topicConf); | |
$topic->consumeStart($partition, $offset); | |
while (TRUE) { | |
$message = $topic->consume($partition, self::CONSUME_MAX_WAIT_TIME); | |
$messageErr = (is_object($message)) ? $message->err : RD_KAFKA_RESP_ERR_UNKNOWN; | |
switch ($messageErr) { | |
case RD_KAFKA_RESP_ERR_NO_ERROR: | |
print_r($message); | |
break; | |
case RD_KAFKA_RESP_ERR__PARTITION_EOF: | |
echo "No more messages; will wait for more\n"; | |
break; | |
case RD_KAFKA_RESP_ERR__TIMED_OUT: | |
echo "Timed out\n"; | |
break; | |
case RD_KAFKA_RESP_ERR_UNKNOWN: | |
continue 2; // continue while | |
default: | |
throw new \Exception($message->errstr(), $message->err); | |
} | |
$topic->offsetStore($message->partition, $message->offset); // schedule offset store after successfully consuming the message | |
} | |
} | |
private function getConf(): RdKafka\Conf | |
{ | |
$conf = new RdKafka\Conf(); | |
$conf->set('compression.codec', self::CONF_COMPRESSION_CODEC); | |
$conf->set('group.id', self::CONF_GROUP_ID); // required when storing offsets | |
return $conf; | |
} | |
private function getTopicConf(): RdKafka\TopicConf | |
{ | |
$topicConf = new RdKafka\TopicConf(); | |
$topicConf->set('auto.commit.enable', 'false'); // don't commit offset automatically | |
$topicConf->set('auto.commit.interval.ms', self::TOPIC_CONF_AUTO_COMMIT_INTERVAL); | |
$topicConf->set('offset.store.method', self::TOPIC_CONF_OFFSET_STORE_METHOD); | |
if (self::TOPIC_CONF_OFFSET_STORE_METHOD === 'file') { | |
$topicConf->set('offset.store.path', sys_get_temp_dir()); | |
} | |
// where to start consuming messages when there is no initial offset in offset store or the desired offset is out of range | |
$topicConf->set('auto.offset.reset', self::TOPIC_CONF_OFFSET_RESET); | |
return $topicConf; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3' | |
services: | |
php: | |
build: | |
context: . | |
args: | |
APP_DEBUG: $APP_DEBUG | |
APP_ENV: $APP_ENV | |
image: srigi/php | |
kafka1: | |
image: wurstmeister/kafka | |
environment: | |
KAFKA_ADVERTISED_HOST_NAME: kafka1 | |
KAFKA_BROKER_ID: 1 | |
KAFKA_DEFAULT_REPLICATION_FACTOR: 2 | |
depends_on: | |
- zookeeper | |
volumes: | |
- kafka1:/kafka | |
kafka2: | |
image: wurstmeister/kafka | |
environment: | |
KAFKA_ADVERTISED_HOST_NAME: kafka2 | |
KAFKA_BROKER_ID: 2 | |
KAFKA_DEFAULT_REPLICATION_FACTOR: 2 | |
depends_on: | |
- zookeeper | |
volumes: | |
- kafka2:/kafka | |
zookeeper: | |
image: zookeeper:3.4 | |
volumes: | |
- zookeeper-data:/data | |
- zookeeper-datalog:/datalog | |
volumes: | |
kafka1: | |
kafka2: | |
zookeeper-data: | |
zookeeper-datalog: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM php:7.1-fpm | |
ENV DEBIAN_FRONTEND=noninteractive | |
# install system libs & PHP extensions | |
RUN apt-get update \ | |
&& apt-get install -y --no-install-recommends \ | |
librdkafka-dev \ | |
unzip \ | |
&& pecl install \ | |
rdkafka \ | |
&& docker-php-ext-enable \ | |
rdkafka | |
# install composer | |
RUN php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');" \ | |
&& php -r "if (hash_file('SHA384', 'composer-setup.php') === '544e09ee996cdf60ece3804abc52599c22b1f40f4323403c44d44fdfdd586475ca9813a858088ffbc1f233e9b180f061') { echo 'Installer verified'; } else { echo 'Installer corrupt'; unlink('composer-setup.php'); } echo PHP_EOL;" \ | |
&& php composer-setup.php --filename=composer --install-dir=/usr/local/bin \ | |
&& php -r "unlink('composer-setup.php');" | |
# Prepare app workdirs, switch to unprivileged user | |
WORKDIR /app | |
RUN mkdir -p \ | |
var/cache \ | |
var/logs \ | |
var/sessions \ | |
&& chown -R www-data:www-data \ | |
/app \ | |
&& chown www-data:www-data \ | |
/var/www | |
USER www-data | |
RUN composer global require hirak/prestissimo | |
# Install app dependencies | |
ARG APP_DEBUG=0 | |
ENV APP_DEBUG=$APP_DEBUG | |
ARG APP_ENV=prod | |
ENV APP_ENV=$APP_ENV | |
COPY ./composer.json ./composer.lock ./ | |
RUN composer install --no-autoloader --no-interaction --no-scripts --no-suggest \ | |
&& composer clearcache | |
# Copy app sources & initialize app | |
COPY ./app ./app/ | |
COPY ./public ./public/ | |
RUN composer dump-autoload --optimize \ | |
&& composer clearcache |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types = 1); | |
namespace App\Kafka; | |
use JMS\Serializer\SerializationContext; | |
use JMS\Serializer\SerializerInterface; | |
use Nette\Utils\DateTime; | |
use Psr\Log\LoggerInterface; | |
use RdKafka; | |
class Producer | |
{ | |
protected const AFTER_PRODUCE_RECOVER_TIMEOUT = 50; | |
protected const MSG_FLAGS = 0; | |
/** @var string */ | |
private $brokersList; | |
/** @var int */ | |
private $logLevel; | |
/** @var LoggerInterface */ | |
private $logger; | |
/** @var RdKafka\Producer */ | |
private $producer; | |
/** @var SerializerInterface */ | |
private $serializer; | |
/** | |
* @param string $brokersList Comma-separated list of brokers in the format: <broker1>,<broker2>,... | |
* @param int $logLevel Specifies the maximum logging level produced by internal kafka logging and debugging | |
* @param LoggerInterface $logger | |
* @param Serializer $serializer | |
*/ | |
public function __construct(string $brokersList, int $logLevel, LoggerInterface $logger, SerializerInterface $serializer) | |
{ | |
$this->brokersList = $brokersList; | |
$this->logLevel = $logLevel; | |
$this->logger = $logger; | |
$this->serializer = $serializer; | |
} | |
protected function connect(): RdKafka\Producer | |
{ | |
if ($this->producer === NULL) { | |
$conf = new RdKafka\Conf(); | |
$conf->set('compression.codec', 'gzip'); | |
$this->producer = new RdKafka\Producer($conf); | |
$this->producer->setLogLevel($this->logLevel); | |
$this->producer->addBrokers($this->brokersList); | |
} | |
return $this->producer; | |
} | |
public function createTopic(string $name, array $conf = []): RdKafka\ProducerTopic | |
{ | |
$producer = $this->connect(); | |
if (empty($conf)) { | |
return $producer->newTopic($name); | |
} else { | |
$topicConf = new RdKafka\TopicConf(); | |
foreach ($conf as $key => $value) { | |
$topicConf->set($key, $value); | |
} | |
return $producer->newTopic($name, $topicConf); | |
} | |
} | |
public function getMetadata(?RdKafka\Topic $topic = NULL, $local = FALSE): RdKafka\Metadata | |
{ | |
$producer = $this->connect(); | |
$metadata = $producer->getMetadata(!$local, $topic, 15 * 1000); | |
return $metadata; | |
} | |
public function produce( | |
RdKafka\ProducerTopic $topic, | |
array $payload, | |
int $partition = RD_KAFKA_PARTITION_UA, | |
?string $messageKey = NULL | |
): void | |
{ | |
$context = (new SerializationContext())->setSerializeNull(TRUE); | |
$payloadStr = $this->serializer->serialize($payload, 'json', $context); | |
$this->logger->info('producing Kafka message', [ | |
'topic' => $topic->getName(), | |
]); | |
$producer = $this->connect(); | |
$topic->produce($partition, self::MSG_FLAGS, $payloadStr, $messageKey); | |
$producer->poll(self::AFTER_PRODUCE_RECOVER_TIMEOUT); | |
while ($producer->getOutQLen() > 0) { | |
$producer->poll(self::AFTER_PRODUCE_RECOVER_TIMEOUT); | |
} | |
$this->logger->info('Kafka message produced successfully', [ | |
'topic' => $topic->getName(), | |
]); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment