library dot-queue

DotKernel queue component



DotKernel queue component

  • Thursday, February 15, 2018
  • by dotkernel
  • Repository
  • 3 Watchers
  • 0 Stars
  • 148 Installations
  • PHP
  • 0 Dependents
  • 0 Suggesters
  • 1 Forks
  • 0 Open issues
  • 11 Versions
  • 5 % Grown



DotKernel queue component, (*1)


  • PHP >= 7.1
  • zendframework/zend-servicemanager
  • zendframework/zend-db (optional - install if using the database adapter)


Run the following command, (*2)

$ composer require dotkernel/dot-queue

After installing all dependencies, add the \Dot\Queue\ConfigProvider::class to your configuration aggregate, in order to register all dependencies and console commands., (*3)


The following queue implementations are provided by this package * Dot\Queue\Queue\InMemoryQueue - non-persisten queue, based on \SplQueue, mainly for testing purposes * Dot\Queue\Queue\PersistentQueue - uses an adapter to persist/fetch jobs to/from a storage, (*4)

Queues must implement Dot\Queue\Queue\QueueInterface or extend Dot\Queue\Queue\AbstractQueue., (*5)

Queue adapters

Queue adapters are used in collaboration with the PersistentQueue. Queue adapters are specific to the storage used Provided queue adapters: * Dot\Queue\Adapter\DatabaseAdapter - based on zend-db, enqueues/dequeues jobs using a MySql storage, (*6)

Queue adapters must implement Dot\Queue\Adapter\AdapterInterface, (*7)

Configuring queues

At this moment, the package offers only a database adapter to be used with the persistent queue. Therefore, the config template below shows how to configure a MySQL queue., (*8)

Create a config file in your config/autoload folder, and replace the {{QUEUE_NAME}} with something appropriate, (*9)


return [
    'dot_queue' => [
        'default_queue' => '{{QUEUE_NAME}}',

        'failed_job_provider' => [
            // these options are specific to the provider used
            // we give here the database failed job provider options
            'db_adapter' => 'database',
            'table' => 'failed_jobs',

        'adapter_manager' => [],
        'adapters' => [
            'database' => [
                'type' => \Dot\Queue\Adapter\DatabaseAdapter::class,
                'options' => [
                    // configured zend db service name adapter
                    'db_adapter' => 'database',
                    'table' => 'jobs',
                    'failed_table' => 'failed_jobs'
                // other adapters...

        'queue_manager' => [],
        'queues' => [
            '{{QUEUE_NAME}}' => [
                // this is the default queue type, if not specified
                // 'type' => \Dot\Queue\Queue\PersistentQueue::class,
                'options' => [
                    'adapter' => 'database',
                    // after how many seconds, failed job will be attempted again
                    'retry_after' => 60,
                    // maybe other queue options later
            // other queues...

You can configure multiple adapters and multiple queues. Multiple queues can also use the same queue adapter., (*10)

Creating job classes

A job represent the unit of work that will be processed by the queue as the queue is consumed. Create job classes by extending Dot\Queue\Job\AbstractJob., (*11)

A job must declare 2 methods * process() - will be called when the job is processed by the queue. Do your work in here. * failed($e) - called when the job fails(the max attempts are exceeded). It will receive the exception that caused the failure, (*12)

class MyJob extends AbstractJob
    public function process()

    public function failed($e)

You can also inject the job class with the needed dependencies. Use a factory class and register the job in the service container for that., (*13)

The QueueManager

The Dot\Queue\Queue\QueueManager is the main class to be injected wherever you dispatch job to a queue., (*14)

In order to create and dispatch a job, (*15)

$job = $queueManager->createJob(MyJob::class)

// set custom data into the job, that you can access when the job will be processed
$job->set('key1', 'some data')
    ->set('key2', 'some other data');

// dispatch the job
$job->dispatch(); //to the default queue OR


  • When creating jobs, always use the queue managers ->createJob(className) method. This will make sure the job is fetched from the container and will be properly initialized., (*16)

  • In order to avoid serialization complication, we advice you to set only scalar or array data into the jobs' payload. This should not represent a limitation, because you can inject services into the job, that can later fetch objects from database and so on..., (*17)

  • Jobs already define some sane defaults for the max attempts, timeout and other job option. Override only what you need., (*18)

Consuming jobs

Run the following dotkernel command in order to start the worker loop to consume the default queue, (*19)

$ php dot queue:consume

For details on the supported command options run, (*20)

$ php dot help queue:consume

Useful consumer options * --all - consume all defined queues, in a round robin fashion * --queues= - comma separated list of queues to consume * --max-runtime= - run the consumer only the specified number of seconds * --max-jobs= - run the consumer until the specified number of jobs have been processed(this includes also the failed jobs) * --sleep= - pause the queue for the specified amount of seconds(in case the queue is empty) Check the command's help for a full list of options, (*21)

In production, we advise you to use a monitoring software, such as supervisord in order to make sure that the consumer is kept alive. During development you can emulate supervisord with the npm-package called forever, (*22)

Database migrations

In order to generate migrations files (to be used by Phinx library) for the jobs table and failed jobs table, two commands are provided * $ php dot queue:jobs-table * $ php dot queue:failed-table, (*23)

Running these commands, will generate migration files with the following default options * the namespace will be set to Data\Database\Migrations * the table names will be jobs and failed_jobs respectively * the path where the files will be generated data/database/migrations, (*24)

You can override these options using the --namespace=, --table-name= and --path options respectively, (*25)

After you have generated the files you can run, (*26)

$ vendor/bin/phinx --configuration=your/config/file migrate

in order to create the tables, (*27)

Handling failed jobs

We provide several commands to help you manage the failed jobs * php dot queue:failed [--queue=] lists all failed jobs, or filtered by queue name * php dot queue:flush [--queue=] remove all failed jobs, optionally filtered by queue name * php dot queue:forget <uuid> remove the job with specified ID from the failed job list * php dot queue:retry [<uuid>] [--queue] re-dispatch a job back into its queue, for retrying. If no ID given, retry all failed jobs or filtered by queue, (*28)


