A multi-processes worker framework based on Swoole and Gearman
A multi-processes worker framework based on Swoole., (*1)
$ composer require baohan/swoole-gearman
But you should set runtime infrastructure up with specified version of swoole
, gearman
and php
first., (*2)
Run by Docker, (*3)
> docker pull baohanddd/swoole-gearman:1.6.1 > docker pull artefactual/gearmand > docker network create swoole-gearman > docker run --rm --name=gearman --network=swoole-gearman artefactual/gearmand > cd your_project/ > docker run --rm --name=worker --network=swoole-gearman -v=$(pwd):/data -w=/data baohanddd/swoole-gearman:1.6.1 php server.php
It's a simple server, we run the server and start some workers are waiting for next job., (*4)
use baohan\SwooleGearman\Collection; use baohan\SwooleGearman\Server; use Monolog\Logger; require('vendor/autoload.php'); try { $s = new Server(Logger::INFO); $s->worker_num = 1; $s->task_worker_num = 19; $s->addCallback('timestamp::print', function () { return new class() extends \baohan\SwooleGearman\Job { public function execute(Collection $payload, int $workerId): bool { sleep(1); echo "worker[{$workerId}] => ".$payload['message'].PHP_EOL; return true; } }; }); $s->start(); } catch (Throwable $e) { echo $e->getMessage(); }
The server will be started., (*5)
[2021-08-20 05:52:59] swoole-gearman.INFO: server starting... {"host":"127.0.0.1","port":9500,"worker_num":1,"task_worker_num":19,"task_max_request":500} []
Let's write a simple client:, (*6)
use Swoole\Client; $client = new Client(SWOOLE_SOCK_TCP); if (!$client->connect('127.0.0.1', 9500, 0.5)) { echo "connect failed. Error: {$client->errCode}\n"; } $data = [ 'name' => 'timestamp::print', 'data' => [ 'message' => '' ] ]; for ($i = 0; $i < 1024; $i++) { $data['data']['message'] = 'query on '.$i; $client->send(json_encode($data)); echo $client->recv(); } $client->close();
The results, (*7)
...... worker[11] => query on 1019 worker[15] => query on 1023 worker[14] => query on 1022 worker[9] => query on 1017 worker[5] => query on 1013 worker[8] => query on 1016 worker[4] => query on 1012 worker[2] => query on 1010 worker[13] => query on 1021 worker[12] => query on 1020 worker[7] => query on 1015 ......
Now we try post job into Gearman Job server., (*8)
To do that, just need replace \baohan\SwooleGearman\Server to \baohan\SwooleGearman\Gearman.
please take a look, there isn't need task_worker_num
option yet, host
and port
are same with gearman-job-server., (*9)
use baohan\SwooleGearman\Collection; use Monolog\Logger; require('vendor/autoload.php'); try { $s = new \baohan\SwooleGearman\Gearman(Logger::INFO); $s->worker_num = 10; $s->host = 'gearman'; $s->port = '4730'; $s->addCallback('timestamp::print', function () { return new class() extends \baohan\SwooleGearman\Job { public function execute(Collection $payload, int $workerId): bool { sleep(1); echo "worker[{$workerId}] => ".$payload['message'].PHP_EOL; return true; } }; }); $s->start(); } catch (Throwable $e) { echo $e->getMessage(); }
On client side, we post job to gearman server instead of swoole server by gearman client., (*10)
<?php $gmc = new GearmanClient(); $gmc->addServer('gearman', '4730'); $data = [ 'name' => 'timestamp::print', 'data' => [ 'message' => '' ] ]; for ($i = 0; $i < 1024; $i++) { $data['data']['message'] = $i + 1; $gmc->doBackground('timestamp::print', json_encode($data)); if ($gmc->returnCode() != GEARMAN_SUCCESS) { echo "bad return code".PHP_EOL; exit; } }
In this mode, The huge advantage is never lose jobs even worker server is crashed, all jobs are stored in Gearman. On the other side, all clients only need to know gearman server address and port, we can deploy more than one instances of swoole server on difference machines more easily., (*11)
That's all., (*12)