Ytake\KsqlClient [ytake/php-ksql]
Apache kafka / Confluent KSQL REST Client for php, (*1)
, (*2)
, (*3)
What is KSQL
KSQL is the streaming SQL engine for Apache Kafka., (*4)
What Is KSQL?, (*5)
Install
required >= PHP 7.1, (*6)
$ composer require ytake/php-ksql
Usage
Request Preset
class |
Ytake\KsqlClient\Query\CommandStatus |
Ytake\KsqlClient\Query\Status |
Ytake\KsqlClient\Query\ServerInfo |
Ytake\KsqlClient\Query\Ksql |
Ytake\KsqlClient\Query\Stream (for stream) |
Syntax Reference, (*7)
Get Command Status
<?php
use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\CommandStatus;
use Ytake\KsqlClient\Computation\CommandId;
$client = new RestClient(
"http://localhost:8088"
);
$result = $client->requestQuery(
new CommandStatus(CommandId::fromString('stream/MESSAGE_STREAM/create'))
)->result();
Get Statuses
<?php
use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Status;
$client = new RestClient(
"http://localhost:8088"
);
$result = $client->requestQuery(new Status())->result();
<?php
use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\ServerInfo;
$client = new RestClient(
"http://localhost:8088"
);
$result = $client->requestQuery(new ServerInfo())->result();
Query KSQL
<?php
use Ytake\KsqlClient\RestClient;
use Ytake\KsqlClient\Query\Ksql;
$client = new RestClient(
"http://localhost:8088"
);
$result = $client->requestQuery(
new Ksql('DESCRIBE users_original;')
)->result();
Client for Stream Response
<?php
use Ytake\KsqlClient\StreamClient;
use Ytake\KsqlClient\Query\Stream;
use Ytake\KsqlClient\StreamConsumable;
use Ytake\KsqlClient\Entity\StreamedRow;
$client = new StreamClient(
"http://localhost:8088"
);
$result = $client->requestQuery(
new Stream(
'SELECT * FROM testing',
new class() implements StreamConsumable {
public function __invoke(StreamedRow $row)
{
// stream response consumer
}
}
)
)->result();