php-consumergroup
php-consumergroup is a kafka consumer library with group and rebalance supports., (*1)
Chinese Doc, (*2)
Requirements
- Apache Kafka 0.7.x, 0.8.x, 0.9.x, 0.10.x
Dependencies
78,000+ messages/s for single process, (*3)
more detail benchmark, (*4)
Example
- installing this library via composer
<?php
use MTKafka\Consumer;
function call_back_func($msg) {
echo "$msg->payload\n";
}
function handle_error_call_back($msg) {
echo $msg->errstr();
}
$consumer = New Consumer("localhost:2181");
$consumer->setGroupId("group-test");
$consumer->setTopic("topic-test");
$consumer->setOffsetAutoReset(Consumer::SMALLEST);
$consumer->setErrHandler("handle_error_call_back");
try {
$consumer->start("echo_message");
}
catch(Exception $e) {
printf("error: %s\n", $e->getMessage());
}
see example.php, (*5)
Consumer Options
Consumer::setMaxMessage()
Number, defaults to 32, (*6)
If partitions > 1, it forces consumers to switch to other partitons when max message is reached, or other partitons will be starved, (*7)
Consumer::setCommitInterval()
Millisecond, defaults to 500ms, (*8)
Offset auto commit interval., (*9)
Consumer::setWatchInterval()
Millisecond, defaults to 10,000 ms, (*10)
Time interval to check rebalance. Rebalance is triggered when the number of partition or consumer changes., (*11)
Consumer::setConsumeTimeout()
Millisecond, default is 1,000 ms, (*12)
Kafka request timeout., (*13)
Consumer::setClientId()
String, defaults to "default", (*14)
Client id is used to identify consumers., (*15)
Consumer::setOffsetAutoReset()
smallest|largest, defaults to smallest, (*16)
Consumer can choose whether to fetch the oldest or the lastest message when offset isn't present in zookeeper or is out of range., (*17)
Consumer::setConf()
Attribute and value are passed in, (*18)
We can use this function to modify the librdkafka configurationă, (*19)
more detail about librdkafka configuration., (*20)
Exception
- Recoverable exceptption (e.g. request timeout), error handler will be called, and you can log error messages.
- Unrecoverable exception (e.g. kafka/zookeeper is broken), exceptions will be thrown, and you should log message and stop the consumer.
Benchmark
| Type |
Parmeter |
| CPU |
Intel(R) Xeon(R) CPU E5-2420 0 @ 1.90GHz |
| CPU Core |
24 |
| Memory |
16G |
| Disk |
SSD |
| Network |
1000Mbit/s |
| Os |
CentOS release 6.7 |
Benchmark is measured by produring 20,000,000 messages at single partition, and calculate the time it takes to consume those messages., (*21)
QPS is 78,000 messages/s when process cpu utility is 100%., (*22)