2017 © Pedro Peláez
 

library query-reactor

Asynchronous & non-blocking MySQL queries executor.

image

repo2/query-reactor

Asynchronous & non-blocking MySQL queries executor.

  • Sunday, February 21, 2016
  • by regeda
  • Repository
  • 5 Watchers
  • 51 Stars
  • 59 Installations
  • PHP
  • 0 Dependents
  • 1 Suggesters
  • 2 Forks
  • 0 Open issues
  • 3 Versions
  • 0 % Grown

The README.md

Query Reactor

Build Status Latest Stable Version Total Downloads License Scrutinizer Code Quality Code Coverage, (*1)

Query Reactor is a non-blocking MySQL queries executor. The framework is simple and fast. All you need is implement Query or use GenericQuery., (*2)

use Psr\Log\NullLogger;
use Repo2\QueryBuilder;
use Repo2\QueryReactor;

$driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger());

$controller = new QueryReactor\Controller\PoolingController([
    'host' => 'localhost',
    'username' => 'root',
    'passwd' => '',
    'dbname' => 'test'
]);

$reactor = new QueryReactor\Reactor($driver, $controller);

$expression = QueryBuilder\select('user', ['id', 'name']);

$query = new QueryReactor\Query\GenericQuery(
    $expression,
    // on fulfill
    function (QueryReactor\Result $result) {
        foreach ($result->traverse() as $row) {
            echo $row['id'], ' -> ', $row['name'], PHP_EOL;
        }
    },
    // on error
    function (\Exception $err) {
        throw $err;
    }
);

$reactor->execQuery($query);

$reactor->await();

Table of contents, (*3)

  1. Installation
  2. Components
  3. Sharding
  4. Restrictions

Installation

Install it with Composer:, (*4)

{
    "require": {
        "repo2/query-reactor": "*"
    }
}

Components

The library requires repo2/query-builder., (*5)

Driver

The Driver provides integration with low level DB API. The API must support non-blocking queries execution., (*6)

Currently the framework implements mysqli driver only., (*7)

Driver usage without Reactor and Controller:, (*8)

use Psr\Log\NullLogger;
use Repo2\QueryBuilder;
use Repo2\QueryReactor;

$driver = new QueryReactor\Driver\Mysqli\MysqliDriver(new NullLogger());

$expression = QueryBuilder\select('user', ['id', 'name']);

$link = $driver->connect(
    ['host' => 'localhost', 'dbname' => 'test'],
    'root',
    'some_secret_passwd'
);

$driver->query($link, $expression);

do {
    list($read, $error) = $driver->poll([$link]);
    foreach ($error as $link) {
        throw $driver->error($link);
    }
    foreach ($read as $link) {
        $result = $driver->read($link);
        foreach ($result->traverse() as $row) {
            echo $row['id'], ' -> ', $row['name'], PHP_EOL;
        }
    }
} while (!$read && !$error);

Controller

The Controller provides coordination between driver connection and query execution., (*9)

The framework includes PoolingController. The controller provides basic logic for connection pooling and query queue., (*10)

Query

The Query provides query definition and result processing., (*11)

getExpression

The method returns query expression., (*12)

function Query::getExpression()

returns ExpressionInterface., (*13)

resolve

The method processes the query result and can create a subquery., (*14)

function Query::resolve(QueryReactor\Result $result)

returns \Iterator, Query or null, (*15)

Example of GenericQuery:, (*16)

use Repo2\QueryBuilder;
use Repo2\QueryReactor;

$query = new QueryReactor\Query\GenericQuery(
    // select all users
    QueryBuilder\select('user', ['id', 'name']),
    // on fulfill
    function (QueryReactor\Result $result) {
        foreach ($result->traverse() as $row) {
            // output a user
            echo $row['id'], ' -> ', $row['name'], PHP_EOL;
            yield new QueryReactor\Query\GenericQuery(
                // update account amount by random value
                QueryBuilder\update('account', ['amount' => mt_rand(10, 100)])
                ->where(
                    QueryBuilder\equal('user_id', $row['id'])
                )
            )
        }
    }
);

$reactor = new QueryReactor\Reactor($driver, $controller);
$reactor->execQuery($query);
$reactor->await();

reject

The method processes the query error., (*17)

function Query::reject(\Exception $error)

returns void, (*18)

Sharding

The framework supports sharding by ShardingController., (*19)

You should do 3 simple steps for getting started in the sharding:, (*20)

  1. implement ShardedQuery, (*21)

    use Repo2\QueryBuilder;
    use Repo2\QueryReactor;
    
    class UserQuery implements QueryReactor\Query, QueryReactor\Sharding\ShardedQuery
    {
        public static $table = 'user';
    
        public $id;
    
        public function resolve(QueryReactor\Result $result)
        {
            foreach ($result->traverse() as $row) {
                echo $row['id'], ' -> ', $row['name'], PHP_EOL;
            }
        }
    
        public function reject(\Exception $err)
        {
            throw $err;
        }
    
        public function getExpression()
        {
            return QueryBuilder\select(self::$table, ['id', 'name'])
            ->where(QueryBuilder\equal('id', $this->id));
        }
    
        public function getDistributionName()
        {
            return self::$table;
        }
    
        public function getDistributionId()
        {
            return $this->id;
        }
    }
    
  2. create own ShardingService, (*22)

    use Repo2\QueryReactor;
    
    class SimpleShardingService implements QueryReactor\Sharding\ShardingService
    {
        public static $primary = [
            'host' => 'localhost',
            'username' => 'root',
            'passwd' => '',
            'dbname' => 'test'
        ];
    
        public static $shards = [
            'user' => [
                ['id' => 1, 'dbname' => 'test1'],
                ['id' => 2, 'dbname' => 'test2'],
                ['id' => 3, 'dbname' => 'test3']
            ]
        ];
    
        public function selectGlobal()
        {
            return self::$primary;
        }
    
        public function selectShard($distributionName, $distributionValue)
        {
            $shards = self::$shards[$distributionName];
            $shard = $shards[$distributionValue % count($shards)];
            return $shard + self::$primary;
        }
    }
    
  3. init the controller, (*23)

    use Repo2\QueryReactor;
    
    $controller = new QueryReactor\Sharding\ShardingController(
        new SimpleShardingService(),
        QueryReactor\Controller\PoolingController::class
    );
    
    $reactor = new QueryReactor\Reactor($driver, $controller);
    $reactor->execQuery(new UserQuery($userId));
    $reactor->await();
    

Restrictions

The framework has some restrictions: - No prepared statements. - No "last insert id" in results., (*24)

Source: https://github.com/Repo2/query-reactor, (*25)

The Versions

21/02 2016

dev-master

9999999-dev

Asynchronous & non-blocking MySQL queries executor.

  Sources   Download

MIT

The Requires

 

sql query mysql async non-blocking

09/02 2015

v1.1

1.1.0.0

Asynchronous & non-blocking MySQL queries executor.

  Sources   Download

MIT

The Requires

 

sql query mysql async non-blocking

25/01 2015

v1.0

1.0.0.0

Asynchronous & non-blocking MySQL queries executor.

  Sources   Download

MIT

The Requires

 

sql query mysql async non-blocking