2017 © Pedro PelĂĄez
 

library php-consumergroup

image

meitu/php-consumergroup

  • Monday, April 2, 2018
  • by mmzp
  • Repository
  • 9 Watchers
  • 33 Stars
  • 29 Installations
  • PHP
  • 0 Dependents
  • 0 Suggesters
  • 4 Forks
  • 0 Open issues
  • 6 Versions
  • 81 % Grown

The README.md

php-consumergroup

php-consumergroup is a kafka consumer library with group and rebalance supports., (*1)

Chinese Doc, (*2)

Requirements

  • Apache Kafka 0.7.x, 0.8.x, 0.9.x, 0.10.x

Dependencies

Performance

78,000+ messages/s for single process, (*3)

more detail benchmark, (*4)

Example

  • installing this library via composer
<?php 
use MTKafka\Consumer;

function call_back_func($msg) {
    echo "$msg->payload\n";
}

function handle_error_call_back($msg) {
    echo $msg->errstr();
}

$consumer = New Consumer("localhost:2181");
$consumer->setGroupId("group-test");
$consumer->setTopic("topic-test");
$consumer->setOffsetAutoReset(Consumer::SMALLEST);
$consumer->setErrHandler("handle_error_call_back");

try {
    $consumer->start("echo_message");
}
catch(Exception $e) {
    printf("error: %s\n", $e->getMessage());
}

see example.php, (*5)

Consumer Options

Consumer::setMaxMessage()

Number, defaults to 32, (*6)

If partitions > 1, it forces consumers to switch to other partitons when max message is reached, or other partitons will be starved, (*7)

Consumer::setCommitInterval()

Millisecond, defaults to 500ms, (*8)

Offset auto commit interval., (*9)

Consumer::setWatchInterval()

Millisecond, defaults to 10,000 ms, (*10)

Time interval to check rebalance. Rebalance is triggered when the number of partition or consumer changes., (*11)

Consumer::setConsumeTimeout()

Millisecond, default is 1,000 ms, (*12)

Kafka request timeout., (*13)

Consumer::setClientId()

String, defaults to "default", (*14)

Client id is used to identify consumers., (*15)

Consumer::setOffsetAutoReset()

smallest|largest, defaults to smallest, (*16)

Consumer can choose whether to fetch the oldest or the lastest message when offset isn't present in zookeeper or is out of range., (*17)

Consumer::setConf()

Attribute and value are passed in, (*18)

We can use this function to modify the librdkafka configuration。, (*19)

more detail about librdkafka configuration., (*20)

Exception

  • Recoverable exceptption (e.g. request timeout), error handler will be called, and you can log error messages.
  • Unrecoverable exception (e.g. kafka/zookeeper is broken), exceptions will be thrown, and you should log message and stop the consumer.

Benchmark

Type Parmeter
CPU Intel(R) Xeon(R) CPU E5-2420 0 @ 1.90GHz
CPU Core 24
Memory 16G
Disk SSD
Network 1000Mbit/s
Os CentOS release 6.7

Benchmark is measured by produring 20,000,000 messages at single partition, and calculate the time it takes to consume those messages., (*21)

QPS is 78,000 messages/s when process cpu utility is 100%., (*22)

The Versions

02/04 2018

1.0.0

1.0.0.0

  Sources   Download

30/10 2017

dev-master

9999999-dev

php kafka consumer

  Sources   Download

Apache-2.0

The Development Requires

by lzf

06/07 2017

1.0.2

1.0.2.0

php kafka consumer

  Sources   Download

Apache-2.0

The Development Requires

by lzf

06/07 2017

dev-fix/issue-2-duplicate-group

dev-fix/issue-2-duplicate-group

php kafka consumer

  Sources   Download

Apache-2.0

The Development Requires

by lzf

05/07 2017

dev-refactor/exception_messsage

dev-refactor/exception_messsage

php kafka consumer

  Sources   Download

Apache-2.0

The Development Requires

by lzf

17/01 2017

1.0.1

1.0.1.0

php kafka consumer

  Sources   Download

Apache-2.0

The Development Requires

by lzf