drealecs/thread-worker
PHP multi-thread-worker message/event library
, (*1)
Introduction
Thread-worker is a library that allows execution of tasks in parallel by multiple PHP processes
on the same computer or on different computers., (*2)
The library has a lot of concepts borrowed from other languages., (*3)
Concepts
-
Task represents a function that should be executed asynchronously., (*4)
-
Queue is a queue of tasks. Tasks can be put into queue by a process and taken out for execution by another process., (*5)
-
Executor is an wrapper over a queue. Tasks will be passed to it in one PHP script and in another php script RemoteExecutor will be used to work on those tasks., (*6)
-
TaskResult or TaskException are the result of a Task., (*7)
Task, TaskResult and TaskException are the entities that are being serialized as "messages" and write to/read from the queue., (*8)
API
Task
A code that will be executed remotely cannot share variables or context with the calling code. When defining a function
that can be executed asynchronously it must be created as a Task by extending \ThreadWorker\Task and implementing method run():
``` php
class AddTask extends ThreadWorker\Task
{
public function run($a, $b)
{
returns $a + $b;
}
}, (*9)
and after that, the task can be used in this way:
``` php
$task = new AddTask(3, 5);
$result = $task();
and this will make $result equals 8., (*10)
Just like a function, a task can return a value or not., (*11)
Of course, the example above execute a task locally, synchronously. To execute it asynchronously we will need a Queue and an Executor., (*12)
Queue
To synchronize working tasks a queue concept is being used., (*13)
There is a queue, someone puts a task in the queue and there are workers that take it and run it., (*14)
Interface of a task queue:, (*15)
-
public function queue($task, $captureResult); - called to queue a task for execution. There are Tasks that don't return a result and Tasks that returns a result.
A task that does not returns a result is usually preferable because the calling code can do other things and get out of scope or even finish execution.
To accomplish this $captureResult must be false in which case the methods does not returns anything.
If we need a execute multiple task remotely and join their result we might need to pass second parameter as true and queue() method will return a task
identifier that can be used later to query and retrieve the task result., (*16)
-
public function start() - called by the script that can execute a task. This is a blocking method and it blocks until there is a task in the queue.
It returns a RemoteTask which is a container for the Task and it's TaskResult., (*17)
-
public function end($remoteTask) - called by the script that executed the task. It marks the task a being finished and it task is one that returns
a response, it stores the TaskResult., (*18)
-
public function getResult($taskId) - usually called by the script that queued the task for execution. It can be called only one time and is blocking
until the task finished to execute., (*19)
-
public function isQueued|isRunning|isFinished($taskId) - methods that can query the state of a task., (*20)
-
public function getQueueSize() and getRunningSize() - methods that can query the queue for it's current workflow capacity., (*21)
Currently there is only one implementation of Queue: \ThreadWorker\RedisQueue and there are plans for: AMQPQueue, MySQLQueue, (*22)
Executor
Executor wraps a queue and provide a simpler interface to queue task and work on tasks and get task results., (*23)
There is a QueueExecutor that has 2 methods:, (*24)
- void execute(Task $task) - adds the task to the queue
- QueuedTask submit(Task $task) - adds the task to the queue and returns a QueuedTask instance that can be used to query task status and retrieve the task result.
Let's look at an example:, (*25)
``` php
$queue = new ThreadWorker\RedisQueue('example');
$executor = new ThreadWorker\QueueExecutor($queue);, (*26)
$task = new AddTask(3, 5);
$queuedTask = $executor->submit($task);
$result = $queuedTask->getResult()->getValue();, (*27)
QueueExecutor is extended into RemoteExecutor that does the asynchronous running of the tasks.
Worker's code would look like this:
``` php
$queue = new ThreadWorker\RedisQueue('example');
$worker = new ThreadWorker\RemoteExecutor($queue);
$worker->work();
An instance of RemoteExecutor is passed as an extra parameter to the run() method of the task and can be use to queue more tasks., (*28)