WQ-Redis (mle86/wq-redis
)
This package contains the PHP class
mle86\WQ\WorkServerAdapter\RedisWorkServer
., (*1)
It supplements the
mle86/wq package
by implementing its WorkServerAdapter
interface., (*2)
It connects to a Redis server
using the phpredis extension., (*3)
Version and Compatibility
This is
version 1.0.2
of mle86/wq-redis
., (*4)
It was developed for
version 1.0.0
of mle86/wq
and should be compatible
with all of its future 1.x versions as well., (*5)
Installation and Dependencies
$ sudo apt install php-redis # to install the phpredis extension
$ composer require mle86/wq-redis
It depends on PHP 7.1,
mle86/wq,
and the phpredis extension., (*6)
Class reference
(class mle86\WQ\WorkServerAdapter\RedisWorkServer implements WorkServerAdapter
), (*7)
It connects to a Redis server., (*8)
Because Redis does not have
delayed entries,
reserved entries,
or buried entries,
this class uses several custom workarounds
to emulate those features., (*9)
For every $workQueue
used,
this class will create multiple Redis keys:, (*10)
-
_wq.$workQueue
(ready jobs – List)
-
_wq_delay.$workQueue
(delayed jobs – Ordered Set)
-
_wq_buried.$workQueue
(buried jobs – List)
The delaying mechanism was inspired by
this StackOverflow response., (*11)
-
public function __construct (\Redis $serverConnection)
Takes an already-configured Redis
instance to work with.
Does not attempt to establish a connection itself –
use the connect()
factory method for that instead
or do it with Redis::connect()
prior to using this constructor.
-
public function connect ($host = "localhost", $port = 6379, $timeout = 0.0, $retry_interval = 0)
Factory method.
This will create a new Redis
instance by itself.
See Redis::connect()
for the parameter descriptions.
Interface methods
which are documented in the WorkServerAdapter
interface:, (*12)
public function storeJob (string $workQueue, Job $job, int $delay = 0)
public function getNextQueueEntry ($workQueue, int $timeout = DEFAULT_TIMEOUT) : ?QueueEntry
public function buryEntry (QueueEntry $entry)
public function requeueEntry (QueueEntry $entry, int $delay, string $workQueue = null)
public function deleteEntry (QueueEntry $entry)
Usage example
<?php
use mle86\WQ\WorkServerAdapter\RedisWorkServer;
use mle86\WQ\WorkProcessor;
use mle86\WQ\Job\Job;
$processor = new WorkProcessor( new RedisWorkServer("localhost") );
while (true) {
$processor->processNextJob("webhook", function(Job $job) {
$job->...;
});
}
This executes all jobs available in the local Redis server's “webhook
” queue, forever.
It will however abort if one of the jobs throws an exception –
you might want to add a logging try-catch block around the processNextJob()
call
as shown in WQ's “Quick Start” example., (*13)