PHP Kafka Symfony bundle for php-rdkafka
[
][GA Link]
![Infection MSI][Infection Image], (*1)
Installation
Add as Composer dependency:, (*2)
composer require simpod/kafka-bundle
Then add KafkaBundle to Symfony's bundles.php:, (*3)
use SimPod\KafkaBundle\SimPodKafkaBundle;
return [
...
new SimPodKafkaBundle()
...
];
Usage
This package simply makes it easier to integrate https://github.com/arnaud-lb/php-rdkafka with Symfony. For more details how to work with Kafka in PHP, refer to its documentation., (*4)
Available console commands:
-
bin/console debug:kafka:consumers to list all available consumer groups
-
bin/console kafka:consumer:run <consumer name> to run consumer instance
Config:
You can create eg. kafka.yaml file in your config directory with following content:, (*5)
kafka:
authentication: '%env(KAFKA_AUTHENTICATION)%'
bootstrap_servers: '%env(KAFKA_BOOTSTRAP_SERVERS)%'
client:
id: 'your-application-name'
-
authentication reads env var KAFKA_AUTHENTICATIOn that contains authentication uri (sasl-plain://user:password, or it might be just empty indicating no authentication).
-
bootstrap_servers reads env var KAFKA_BOOTSTRAP_SERVERS that contains comma-separated list of bootstrap servers (broker-1.kafka:9092,broker-2.kafka:9092).
If bootstrap_servers isn't set, it defaults to 127.0.0.1:9092, (*6)
Services
Following services are registered in container and can be DI injected., (*7)
Configuration
class: \SimPod\KafkaBundle\Kafka\Configuration, (*8)
Configuration service allows easy access to all the configuration properties., (*9)
$config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getIdWithHostname());
Consuming
There's interface NamedConsumer available. When your consumer implements it, this bundle autoregisters it., (*10)
This is example of simple consumer, it can be then run via bin/console kafka:consumer:run consumer1, (*11)
<?php
declare(strict_types=1);
namespace Your\AppNamespace;
use SimPod\Kafka\Clients\Consumer\ConsumerConfig;
use SimPod\Kafka\Clients\Consumer\KafkaConsumer;
use SimPod\KafkaBundle\Kafka\Configuration;
use SimPod\KafkaBundle\Kafka\Clients\Consumer\NamedConsumer;
final class ExampleKafkaConsumer implements NamedConsumer
{
private Configuration $configuration;
public function __construct(Configuration $configuration)
{
$this->configuration = $configuration;
}
public function run(): void
{
$kafkaConsumer = new KafkaConsumer($this->getConfig());
$kafkaConsumer->subscribe(['topic1']);
while (true) {
...
}
}
public function getName(): string {
return 'consumer1';
}
private function getConfig(): ConsumerConfig
{
$config = new ConsumerConfig();
$config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, $this->configuration->getBootstrapServers());
$config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false);
$config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getClientIdWithHostname());
$config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest');
$config->set(ConsumerConfig::GROUP_ID_CONFIG, 'consumer_group');
return $config;
}
}
Development
There is kwn/php-rdkafka-stubs listed as a dev dependency so it properly integrates php-rdkafka extension with IDE., (*12)