dev-master
9999999-dev
MIT
The Requires
The Development Requires
by Derek Lam
Wallogit.com
2017 © Pedro Peláez
$ composer require hhreact/ampreactor
If you know what InteractiveX is, AmpReactor matches the InteractiveX philosophy closely, really only straying significantly for the binding operators., (*1)
If, alternatively, you know what ReactiveX is, InteractiveX is the "pull" analogue, where enumerables take the place of observables., (*2)
If you know about AmPHP's Iterator, AmpReactor lets you clone, reduce and expand async Iterators, and lets you enforce precise policies on buffering., (*3)
<?php
use AmpReactor\InteractiveProducer;
\Amp\Loop::run(function() {
// Start with any Iterator
$iter_numbers = \Amp\Producer(function($emit) {
for($i = 0; ; yield $emit($i++)) {}
});
// Now make it really P R O D U C E
$number_producer = InteractiveProducer::create($iter_numbers);
// Transform a stream, e.g. map
$square_producer = (clone $number_producer)->map(function($root) { return pow($root, 2) });
// Transform two streams into one, e.g. zip
$cube_producer = InteractiveProducer::zip($number_producer, $square_producer, function($root, $square) { return $root * $square; });
// Transform many streams into one, e.g. merge
$merged_producer = InteractiveProducer::merge(Vector{ $number_producer, $square_producer, $cube_producer });
while(yield $merged_producer->advance()) {
// numbers flying at your face! Beware: no guaranteed order with `merge`
$item = $merged_producer->getCurrent();
var_dump($item);
// To cancel/dispose, just use what the language gives you: `break`, `return` and `throw`;
// the iterating scope is in full control.
if($item > 42) {
unset($merged_producer);
break;
}
// The "Details of InteractiveProducer" section further down explains what
// happens when you cancel an InteractiveProducer partway
}
// Note that InteractiveProducer wraps transparently:
$cloned_producer = clone $number_producer;
while(yield $cloned_producer->advance()) { /* same items as $iter_numbers */ }
// In general, don't try iterate the original Iterator:
// you'll probably get a "The prior promise returned must resolve
// before invoking this method again" exception
});
This library is something of a language port of HHReactor, the original implementation of this functionality which was written in Hacklang. In replacement of Hacklang's built-in async, this library builds on the Amp concurrency framework., (*4)
Amp allows one to write asynchronous code without callbacks in PHP by carefully controlling generators and interfacing between these generators and PHP event loop extensions. This way, async and sync code can blend seamlessly, and in similar spirit to Hack async, a lot of idioms from the language itself are preserved for async behavior., (*5)
By itself, Amp's Producer already:, (*6)
Foremost, AmpReactor diversifies the backpressure options to match those in InteractiveX: Share, Memoize and Publish (described later). This way, iterators are no longer limited to one consumer at a time., (*7)
Many consumers potentially means many useful forms of the same iterator, so built into AmpReactor are almost all of the ReactiveX/InteractiveX operators., (*8)
BaseProducer: manages cloning and accounting on running clonesInteractiveProducer extends BaseProducer: InteractiveX operators and support for arbitrary scheduling and higher-order iterators. The ⭐ of the showMost of the operators match the canonical InteractiveX/ReactiveX signatures. The quickest way to scan the signatures is to look at the reference documentation., (*9)
Major discrepancies:, (*10)
share, memoize, publish: these are replaced by cloning.defer operator: no strong motivation to implement it.InteractiveProducer
If two or more scopes consume the same stream, they can either clone or not clone the InteractiveProducer:, (*11)
[Ix's Memoize & Publish] If the InteractiveProducer is cloned, the buffer is also cloned, so consumers will receive the same elements from the moment of cloning. In this way, clones act like InteractiveX's Memoize and ReactiveX's Replay., (*12)
, (*13)
To emphasize: the clone doesn't see any elements produced by that InteractiveProducer before the clone exists. In this way, Memoize and Publish behavior differ only in when/of what the consumer takes a clone. Cloning from a InteractiveProducer that is never iterated will give Memoize behavior. Cloning moving InteractiveProducers will give Publish behavior., (*14)
Note however that, because the consumer is in control of iteration, the underlying iterators aren't started until the first clone requests the first element., (*15)
Behavioral Note: as will be mentioned below too, the buffer is managed like
Publishrather thanMemoize.InteractiveProduceris very straightforward with deciding which nodes are reclaimed because it relies on the garbage collector: once the laggiest consumer advances, the node is reclaimed. It is then a simple and explicit matter of keeping or not keeping unstartedInteractiveProducers, which will or won't hold elements from the very beginning respectively., (*16)
[Ix's Share] If the InteractiveProducer is not cloned, consumers all share the same buffer, and hence they compete directly for values., (*17)
, (*18)
Behavioral note: All operators implicitly clone their operands to avoid competing with other operators or raw consumers for values; they all implicitly
Publish., (*19)
InteractiveProducer
The producing and consuming timelines are separated by a buffer, and if not paused, the underlying iterators don't wait for consumption. That is, once started, they will run and push to the buffer even in the absence of a matching stream of advance calls. As a result, iterators can run as quickly as possible, and memory can be controlled by managing references and sharing behaviors., (*20)
Generally, InteractiveProducer relies on the garbage collector to clear the buffer it accumulates from differences in consumption rates between consumers. As the laggiest consumers step their way through the buffer, their references to the earliest nodes of the buffer linked list are shed and the garbage collector clears these unreachable nodes., (*21)
The how, (*22)
You can stop consuming from a InteractiveProducer in a few ways, each with different consequences for resources., (*23)
InteractiveProducers that come from operations on the InteractiveProducer of interest.The why, (*24)
When disposing of InteractiveProducers, there are two determining factors to the iterators and buffers in their ecosystem after they become unreachable:, (*25)
next ever been called on it, its clones, or InteractiveProducers coming from its operators?InteractiveProducers (e.g. are they results of operators on other InteractiveProducers)?AsyncGenerators?InteractiveProducer is designed with pausing in mind, to meet the need to arbitrarily stop buffering values but keep resources open to resume later. Some informative aspects of the process:, (*26)
InteractiveProducer, it begins "running".InteractiveProducer knows the number of running clones.detaching a running InteractiveProducer decrements the running count.InteractiveProducer:
InteractiveProducers by decrementing their running refcounts.*See 1. InteractiveProducer::_attach(); 2. BaseProducer::running_count, BaseProducer::this_running, BaseProducer::some_running; 3. InteractiveProducer::_detach; 4.1. InteractiveProducer::iterator_to_emitting_generator; 4.2. InteractiveProducer::_detach., (*27)
*A InteractiveProducer knows it holds running references to all of its children because, as part of its attachment routine, InteractiveProducer must start iterating them all., (*28)
MIT