dev-master
9999999-devKafka Consumer and Producer Library for PHP
Apache
The Requires
- php >=5.4
by Ross Riley
by Michal Harish
by Pau Gay
by Wojtek Wajerowicz
Kafka Consumer and Producer Library for PHP
This is an alternative to the existing Kafka PHP Client which is in the incubator, the main motivation to write it was that it seemed important that the fetch requests are not loaded entirely into memory but pulled continuously from the socket as well as the fact that PHP has a different control flow and communication pattern (each request is a thread within HTTP server) so the api doesn't need to follow the scala/java object graph and can be much simpler., (*1)
There are few differences to the existing Kafka PHP Client:, (*2)
- Streaming message individually rather than loading the whole response into memory - Offset implemented by hexdecimal tranformation to fully support Kafka long offsets - Gzip working correctly both ways, including the pre-compression message header - Messages produced in batch consumed correctly in compressed as well as uncompressed state - CRC32 check working - Producers and Consumers are abstracted to allow for changes in Kafka API without disrupting the client code - Broker abstraction for different connection strategies - OffsetRequest workaround for 64-bit unix timestamp - Produce request only checks correct bytes were sent (ack not available) - Producer compresses batches of consecutive messages with same compression codec as a single message
There is a set of example scripts under the 'script' folder. The parameters convention are shared across all the scripts but different scripts has different parameters., (*3)
-c Connector, set which Zookeeper server you want to connect -b Broker, the Kafka broker where we want to connect -t Topic, sets the topic where you want to produce -m Message, sets the message you want to produce -l List, will list the available topics -o Offset (optional), sets the starting point where we want to consume -h Help, it will display the help for the script
Those are the available scripts:, (*4)
./scripts/simple/producer -b {broker} -t {topc} ./scripts/simple/producer -b hq-pau-d02:9092 -t test-topic ./scripts/simple/consumer -b {broker} -t {topic} [-o {offset}] ./scripts/simple/consumer -b hq-pau-d02:9092 -t test-topic ./scripts/producers/producer -c {connector} -t {topic} -m {message} ./scripts/producers/producer -c hq-pau-d02:2181 -t test-topic -m "Hello" ./scripts/producers/cached -c {connector} -t {topic} -m {message} ./scripts/producers/cached -c hq-pau-d02:2181 -t test-topic -m "Hello" ./scripts/producers/partitioned -c {connector} -t {topic} -m {message} ./scripts/producers/partitioned -c hq-pau-d02:2181 -t test-topic -m "Hello" ./scripts/producers/daemon -c {connector} -t {topic} ./scripts/producers/daemon -c hq-pau-d02:2181 -t test-topic ./scripts/consumers/consumer -c {connector} -t {topic} ./scripts/consumers/consumer -c hq-pau-d02:2181 -t test-topic ./scripts/consumers/daemon -c {connector} -t {topic} ./scripts/consumers/daemon -c hq-pau-d02:2181 -t test-topic
Tests is a set of native PHP assert() calls included by the main runner:, (*5)
$> ./test
This is not a tutorial, but will ilustrate how to create simple producer and consumer, just to ilustrate how to use the kafka-php library., (*6)
This code will produce a message to the given topic., (*7)
// require kafka-php library require "kafka-php/src/Kafka/Kafka.php"; $connector = "hq-pau-d02:2181"; $topic = "test-topic"; $message = "Hello world!"; $producer = \Kafka\ProducerConnector::Create($connector); // add the message $producer->addMessage($topic, $message); // produce the actual messages into kafka $producer->produce();
This will show how to create a consumer and consume a single message. Not that usefull, but will ilustrate the point., (*8)
// require kafka-php library require "kafka-php/src/Kafka/Kafka.php"; // setting variables $connector = "hq-pau-d02:2181"; $topic = "test-topic"; // create the connector $cc = \Kafka\ConsumerConnector::Create($connector); // create the message stream, we point to the beginning // of the topic offset $messageStream = $cc->createMessageStreams( $topic, 65535, \Kafka\Kafka::OFFSETS_EARLIEST ); // get the message $message = $messageStream[0]->nextMessage(); // output the message echo $message->payload() ."\n";
This consumer will do a similar thing, but will consume all messages for a particular given topic, since the beginning (offset = 0)., (*9)
// require kafka-php library require "kafka-php/src/Kafka/Kafka.php"; // setting variables $connector = "hq-pau-d02:2181"; $topic = "test-topic"; // create the connector $cc = \Kafka\ConsumerConnector::Create($connector); // create the message stream, we point to the beginning // of the topic offset $messageStreams = $cc->createMessageStreams( $topic, 65535, \Kafka\Kafka::OFFSETS_EARLIEST ); // infinite loop while (true) { $fetchCount = 0; foreach ($messageStreams as $mid => $messageStream) { while ($message = $messageStream->nextMessage()) { $fetchCount++; echo $message->payload() . "\n"; } } if ($fetchCount == 0) { echo " --- no more messages ---\n"; die; } }
And finally, some closer to the real usage of this library. A consumer that will listen for new messages produced for a particular topic. The changes with regard to the previous consumers are that this time we ware going to set the highest possible offset, in order to ignore the past messages and only intercept the new ones., (*10)
// require kafka-php library require "kafka-php/src/Kafka/Kafka.php"; // setting variables $connector = "hq-pau-d02:2181"; $topic = "test-topic"; // create the connector $cc = \Kafka\ConsumerConnector::Create($connector); // create the message stream, we point to the end // of the topic offset $messageStreams = $cc->createMessageStreams( $topic, 65535, \Kafka\Kafka::OFFSETS_LATEST ); while (true) { $fetchCount = 0; foreach ($messageStreams as $mid => $messageStream) { // keep getting messages, if we have more while ($message = $messageStream->nextMessage()) { $fetchCount++; // just print topic and payload echo "{$message->payload()}\n"; } } if ($fetchCount == 0) { // no more messages, so sleep and try again sleep(1); } }
Those are the list of pending tasks:, (*11)
TODO - detect 64-bit php and replace Kafka_Offset hex for decimal under the hood, (*12)
TODO - profiling & optimization, (*13)
First prepare for compiling c sources and automake tools if you aren't, (*14)
sudo apt-get install build-essential checkinstall libcppunit-dev autoconf automake libtool ant
Then you'll need to compile the libzookeeper from the c sources, (*15)
sudo git clone git://github.com/apache/zookeeper.git /usr/share/zookeeper cd /usr/share/zookeeper/ sudo ant compile_jute cd src/c ACLOCAL="aclocal -I /usr/local/share/aclocal" sudo autoreconf -if //OR// ACLOCAL="aclocal -I /usr/share/aclocal" sudo autoreconf -if sudo ./configure sudo make sudo make install
Clone php-zookeeper source and build php extension with phpize, (*16)
apt-get install php5-dev sudo git clone git://github.com/andreiz/php-zookeeper.git /usr/share/php-zookeeper cd /usr/share/php-zookeeper git checkout v0.2.1 phpize sudo ./configure sudo make sudo make install sudo echo "extension=zookeeper.so" > /etc/php5/cli/conf.d/zookeeper.ini sudo echo "extension=zookeeper.so" > /etc/php5/apache2/conf.d/zookeeper.ini
Test if it works on cli and restart apache!, (*17)
echo '<?php $zoo = new Zookeeper("localhost:2181"); print_r($zoo->getChildren("/"));' | php service apache2 restart
Kafka Consumer and Producer Library for PHP
Apache