diff --git a/src/ExtEvLoop.php b/src/ExtEvLoop.php index fedd5884..c7439bdc 100644 --- a/src/ExtEvLoop.php +++ b/src/ExtEvLoop.php @@ -20,7 +20,7 @@ * @see http://php.net/manual/en/book.ev.php * @see https://bitbucket.org/osmanov/pecl-ev/overview */ -class ExtEvLoop implements LoopInterface +class ExtEvLoop implements ExtLoopInterface { /** * @var EvLoop @@ -47,6 +47,11 @@ class ExtEvLoop implements LoopInterface */ private $writeStreams = array(); + /** + * @var bool[] + */ + private $allStreams = array(); + /** * @var bool */ @@ -62,6 +67,10 @@ class ExtEvLoop implements LoopInterface */ private $signalEvents = array(); + private $dereferences = array(); + private $derefTimers = 0; + private $derefStreams = 0; + public function __construct() { $this->loop = new EvLoop(); @@ -81,6 +90,7 @@ public function addReadStream($stream, $listener) $callback = $this->getStreamListenerClosure($stream, $listener); $event = $this->loop->io($stream, Ev::READ, $callback); $this->readStreams[$key] = $event; + $this->allStreams[$key] = true; } /** @@ -107,6 +117,7 @@ public function addWriteStream($stream, $listener) $callback = $this->getStreamListenerClosure($stream, $listener); $event = $this->loop->io($stream, Ev::WRITE, $callback); $this->writeStreams[$key] = $event; + $this->allStreams[$key] = true; } public function removeReadStream($stream) @@ -118,7 +129,7 @@ public function removeReadStream($stream) } $this->readStreams[$key]->stop(); - unset($this->readStreams[$key]); + unset($this->readStreams[$key], $this->allStreams[$key]); } public function removeWriteStream($stream) @@ -130,7 +141,7 @@ public function removeWriteStream($stream) } $this->writeStreams[$key]->stop(); - unset($this->writeStreams[$key]); + unset($this->writeStreams[$key], $this->allStreams[$key]); } public function addTimer($interval, $callback) @@ -183,6 +194,60 @@ public function futureTick($listener) $this->futureTickQueue->add($listener); } + public function reference($streamOrTimer) + { + if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) { + if (!$this->timers->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefTimers--; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefStreams--; + } + } + } + + public function dereference($streamOrTimer) + { + if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) { + if (!$this->timers->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefTimers++; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefStreams++; + } + } + } + public function run() { $this->running = true; @@ -191,16 +256,15 @@ public function run() $this->futureTickQueue->tick(); $hasPendingCallbacks = !$this->futureTickQueue->isEmpty(); + $hasStreams = \count($this->allStreams) > $this->derefStreams; + $hasTimers = $this->timers->count() > $this->derefTimers; + $wasJustStopped = !$this->running; - $nothingLeftToDo = !$this->readStreams - && !$this->writeStreams - && !$this->timers->count() - && $this->signals->isEmpty(); $flags = Ev::RUN_ONCE; if ($wasJustStopped || $hasPendingCallbacks) { $flags |= Ev::RUN_NOWAIT; - } elseif ($nothingLeftToDo) { + } elseif (!$hasStreams && !$hasTimers) { break; } diff --git a/src/ExtEventLoop.php b/src/ExtEventLoop.php index fd403d4a..038c65b9 100644 --- a/src/ExtEventLoop.php +++ b/src/ExtEventLoop.php @@ -20,7 +20,7 @@ * * @link https://pecl.php.net/package/event */ -final class ExtEventLoop implements LoopInterface +final class ExtEventLoop implements ExtLoopInterface { private $eventBase; private $futureTickQueue; @@ -31,12 +31,17 @@ final class ExtEventLoop implements LoopInterface private $writeEvents = array(); private $readListeners = array(); private $writeListeners = array(); + private $allStreams = array(); private $readRefs = array(); private $writeRefs = array(); private $running; private $signals; private $signalEvents = array(); + private $dereferences = array(); + private $derefTimers = 0; + private $derefStreams = 0; + public function __construct() { if (!\class_exists('EventBase', false)) { @@ -66,6 +71,7 @@ public function addReadStream($stream, $listener) $event->add(); $this->readEvents[$key] = $event; $this->readListeners[$key] = $listener; + $this->allStreams[$key] = true; // ext-event does not increase refcount on stream resources for PHP 7+ // manually keep track of stream resource to prevent premature garbage collection @@ -85,6 +91,7 @@ public function addWriteStream($stream, $listener) $event->add(); $this->writeEvents[$key] = $event; $this->writeListeners[$key] = $listener; + $this->allStreams[$key] = true; // ext-event does not increase refcount on stream resources for PHP 7+ // manually keep track of stream resource to prevent premature garbage collection @@ -102,7 +109,8 @@ public function removeReadStream($stream) unset( $this->readEvents[$key], $this->readListeners[$key], - $this->readRefs[$key] + $this->readRefs[$key], + $this->allStreams[$key] ); } } @@ -116,7 +124,8 @@ public function removeWriteStream($stream) unset( $this->writeEvents[$key], $this->writeListeners[$key], - $this->writeRefs[$key] + $this->writeRefs[$key], + $this->allStreams[$key] ); } } @@ -172,6 +181,60 @@ public function removeSignal($signal, $listener) } } + public function reference($streamOrTimer) + { + if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) { + if (!$this->timerEvents->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefTimers--; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefStreams--; + } + } + } + + public function dereference($streamOrTimer) + { + if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) { + if (!$this->timerEvents->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefTimers++; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefStreams++; + } + } + } + public function run() { $this->running = true; @@ -179,10 +242,13 @@ public function run() while ($this->running) { $this->futureTickQueue->tick(); + $hasStreams = \count($this->allStreams) > $this->derefStreams; + $hasTimers = $this->timerEvents->count() > $this->derefTimers; + $flags = EventBase::LOOP_ONCE; if (!$this->running || !$this->futureTickQueue->isEmpty()) { $flags |= EventBase::LOOP_NONBLOCK; - } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count() && $this->signals->isEmpty()) { + } elseif (!$hasStreams && !$hasTimers && $this->signals->isEmpty()) { break; } diff --git a/src/ExtLibevLoop.php b/src/ExtLibevLoop.php index 193c6c0d..34062b9c 100644 --- a/src/ExtLibevLoop.php +++ b/src/ExtLibevLoop.php @@ -24,17 +24,22 @@ * @see https://github.com/m4rw3r/php-libev * @see https://gist.github.com/1688204 */ -final class ExtLibevLoop implements LoopInterface +final class ExtLibevLoop implements ExtLoopInterface { private $loop; private $futureTickQueue; private $timerEvents; private $readEvents = array(); private $writeEvents = array(); + private $allStreams = array(); private $running; private $signals; private $signalEvents = array(); + private $dereferences = array(); + private $derefTimers = 0; + private $derefStreams = 0; + public function __construct() { if (!\class_exists('libev\EventLoop', false)) { @@ -61,6 +66,7 @@ public function addReadStream($stream, $listener) $this->loop->add($event); $this->readEvents[(int) $stream] = $event; + $this->allStreams[(int) $stream] = true; } public function addWriteStream($stream, $listener) @@ -77,6 +83,7 @@ public function addWriteStream($stream, $listener) $this->loop->add($event); $this->writeEvents[(int) $stream] = $event; + $this->allStreams[(int) $stream] = true; } public function removeReadStream($stream) @@ -86,7 +93,7 @@ public function removeReadStream($stream) if (isset($this->readEvents[$key])) { $this->readEvents[$key]->stop(); $this->loop->remove($this->readEvents[$key]); - unset($this->readEvents[$key]); + unset($this->readEvents[$key], $this->allStreams[$key]); } } @@ -97,7 +104,7 @@ public function removeWriteStream($stream) if (isset($this->writeEvents[$key])) { $this->writeEvents[$key]->stop(); $this->loop->remove($this->writeEvents[$key]); - unset($this->writeEvents[$key]); + unset($this->writeEvents[$key], $this->allStreams[$key]); } } @@ -174,6 +181,60 @@ public function removeSignal($signal, $listener) } } + public function reference($streamOrTimer) + { + if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) { + if (!$this->timerEvents->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefTimers--; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefStreams--; + } + } + } + + public function dereference($streamOrTimer) + { + if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) { + if (!$this->timerEvents->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefTimers++; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefStreams++; + } + } + } + public function run() { $this->running = true; @@ -181,10 +242,13 @@ public function run() while ($this->running) { $this->futureTickQueue->tick(); + $hasStreams = \count($this->allStreams) > $this->derefStreams; + $hasTimers = $this->timerEvents->count() > $this->derefTimers; + $flags = EventLoop::RUN_ONCE; if (!$this->running || !$this->futureTickQueue->isEmpty()) { $flags |= EventLoop::RUN_NOWAIT; - } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count() && $this->signals->isEmpty()) { + } elseif (!$hasStreams && !$hasTimers) { break; } diff --git a/src/ExtLibeventLoop.php b/src/ExtLibeventLoop.php index 55c2fca0..3b11e2d1 100644 --- a/src/ExtLibeventLoop.php +++ b/src/ExtLibeventLoop.php @@ -33,7 +33,7 @@ * * @link https://pecl.php.net/package/libevent */ -final class ExtLibeventLoop implements LoopInterface +final class ExtLibeventLoop implements ExtLoopInterface { /** @internal */ const MICROSECONDS_PER_SECOND = 1000000; @@ -47,10 +47,15 @@ final class ExtLibeventLoop implements LoopInterface private $writeEvents = array(); private $readListeners = array(); private $writeListeners = array(); + private $allStreams = array(); private $running; private $signals; private $signalEvents = array(); + private $dereferences = array(); + private $derefTimers = 0; + private $derefStreams = 0; + public function __construct() { if (!\function_exists('event_base_new')) { @@ -80,6 +85,7 @@ public function addReadStream($stream, $listener) $this->readEvents[$key] = $event; $this->readListeners[$key] = $listener; + $this->allStreams[$key] = true; } public function addWriteStream($stream, $listener) @@ -96,6 +102,7 @@ public function addWriteStream($stream, $listener) $this->writeEvents[$key] = $event; $this->writeListeners[$key] = $listener; + $this->allStreams[$key] = true; } public function removeReadStream($stream) @@ -109,7 +116,8 @@ public function removeReadStream($stream) unset( $this->readEvents[$key], - $this->readListeners[$key] + $this->readListeners[$key], + $this->allStreams[$key] ); } } @@ -125,7 +133,8 @@ public function removeWriteStream($stream) unset( $this->writeEvents[$key], - $this->writeListeners[$key] + $this->writeListeners[$key], + $this->allStreams[$key] ); } } @@ -187,6 +196,60 @@ public function removeSignal($signal, $listener) } } + public function reference($streamOrTimer) + { + if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) { + if (!$this->timerEvents->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefTimers--; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefStreams--; + } + } + } + + public function dereference($streamOrTimer) + { + if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) { + if (!$this->timerEvents->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefTimers++; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefStreams++; + } + } + } + public function run() { $this->running = true; @@ -194,10 +257,13 @@ public function run() while ($this->running) { $this->futureTickQueue->tick(); + $hasStreams = \count($this->allStreams) > $this->derefStreams; + $hasTimers = $this->timerEvents->count() > $this->derefTimers; + $flags = \EVLOOP_ONCE; if (!$this->running || !$this->futureTickQueue->isEmpty()) { $flags |= \EVLOOP_NONBLOCK; - } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count() && $this->signals->isEmpty()) { + } elseif (!$hasStreams && !$hasTimers) { break; } diff --git a/src/ExtLoopInterface.php b/src/ExtLoopInterface.php new file mode 100644 index 00000000..5ffad805 --- /dev/null +++ b/src/ExtLoopInterface.php @@ -0,0 +1,32 @@ +timers->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefTimers--; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefStreams--; + } + } + } + + public function dereference($streamOrTimer) + { + if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) { + if (!$this->timers->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefTimers++; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefStreams++; + } + } + } + /** * {@inheritdoc} */ @@ -211,11 +270,10 @@ public function run() $this->futureTickQueue->tick(); $hasPendingCallbacks = !$this->futureTickQueue->isEmpty(); + $hasStreams = \count($this->allStreams) > $this->derefStreams; + $hasTimers = $this->timers->count() > $this->derefTimers; + $wasJustStopped = !$this->running; - $nothingLeftToDo = !$this->readStreams - && !$this->writeStreams - && !$this->timers->count() - && $this->signals->isEmpty(); // Use UV::RUN_ONCE when there are only I/O events active in the loop and block until one of those triggers, // otherwise use UV::RUN_NOWAIT. @@ -223,7 +281,7 @@ public function run() $flags = \UV::RUN_ONCE; if ($wasJustStopped || $hasPendingCallbacks) { $flags = \UV::RUN_NOWAIT; - } elseif ($nothingLeftToDo) { + } elseif (!$hasStreams && !$hasTimers) { break; } @@ -241,6 +299,8 @@ public function stop() private function addStream($stream) { + $this->allStreams[(int) $stream] = true; + if (!isset($this->streamEvents[(int) $stream])) { $this->streamEvents[(int)$stream] = \uv_poll_init_socket($this->uv, $stream); } @@ -252,6 +312,8 @@ private function addStream($stream) private function removeStream($stream) { + unset($this->allStreams[(int) $stream]); + if (!isset($this->streamEvents[(int) $stream])) { return; } diff --git a/src/StreamSelectLoop.php b/src/StreamSelectLoop.php index 3e6ff07f..669ff138 100644 --- a/src/StreamSelectLoop.php +++ b/src/StreamSelectLoop.php @@ -2,7 +2,6 @@ namespace React\EventLoop; -use React\EventLoop\Signal\Pcntl; use React\EventLoop\Tick\FutureTickQueue; use React\EventLoop\Timer\Timer; use React\EventLoop\Timer\Timers; @@ -49,7 +48,7 @@ * * @link http://php.net/manual/en/function.stream-select.php */ -final class StreamSelectLoop implements LoopInterface +final class StreamSelectLoop implements ExtLoopInterface { /** @internal */ const MICROSECONDS_PER_SECOND = 1000000; @@ -60,11 +59,16 @@ final class StreamSelectLoop implements LoopInterface private $readListeners = array(); private $writeStreams = array(); private $writeListeners = array(); + private $allStreams = array(); private $running; private $pcntl = false; private $pcntlActive = false; private $signals; + private $dereferences = array(); + private $derefTimers = 0; + private $derefStreams = 0; + public function __construct() { $this->futureTickQueue = new FutureTickQueue(); @@ -83,6 +87,7 @@ public function addReadStream($stream, $listener) $key = (int) $stream; if (!isset($this->readStreams[$key])) { + $this->allStreams[$key] = true; $this->readStreams[$key] = $stream; $this->readListeners[$key] = $listener; } @@ -93,6 +98,7 @@ public function addWriteStream($stream, $listener) $key = (int) $stream; if (!isset($this->writeStreams[$key])) { + $this->allStreams[$key] = true; $this->writeStreams[$key] = $stream; $this->writeListeners[$key] = $listener; } @@ -104,7 +110,8 @@ public function removeReadStream($stream) unset( $this->readStreams[$key], - $this->readListeners[$key] + $this->readListeners[$key], + $this->allStreams[$key] ); } @@ -114,7 +121,8 @@ public function removeWriteStream($stream) unset( $this->writeStreams[$key], - $this->writeListeners[$key] + $this->writeListeners[$key], + $this->allStreams[$key] ); } @@ -173,6 +181,60 @@ public function removeSignal($signal, $listener) } } + public function reference($streamOrTimer) + { + if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) { + if (!$this->timers->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefTimers--; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (isset($this->dereferences[$key])) { + unset($this->dereferences[$key]); + $this->derefStreams--; + } + } + } + + public function dereference($streamOrTimer) + { + if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) { + if (!$this->timers->contains($streamOrTimer)) { + throw new \InvalidArgumentException('Given timer is not part of this loop'); + } + + $key = \spl_object_hash($streamOrTimer); + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefTimers++; + } + } else { + $key = (int) $streamOrTimer; + + if (!isset($this->allStreams[$key])) { + throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop'); + } + + if (!isset($this->dereferences[$key])) { + $this->dereferences[$key] = true; + $this->derefStreams++; + } + } + } + public function run() { $this->running = true; @@ -182,12 +244,15 @@ public function run() $this->timers->tick(); + $hasStreams = \count($this->allStreams) > $this->derefStreams; + $hasTimers = $this->timers->count() > $this->derefTimers; + // Future-tick queue has pending callbacks ... if (!$this->running || !$this->futureTickQueue->isEmpty()) { $timeout = 0; // There is a pending timer, only block until it is due ... - } elseif ($scheduledAt = $this->timers->getFirst()) { + } elseif ($hasTimers && $scheduledAt = $this->timers->getFirst()) { $timeout = $scheduledAt - $this->timers->getTime(); if ($timeout < 0) { $timeout = 0; @@ -199,8 +264,8 @@ public function run() $timeout = $timeout > \PHP_INT_MAX ? \PHP_INT_MAX : (int)$timeout; } - // The only possible event is stream or signal activity, so wait forever ... - } elseif ($this->readStreams || $this->writeStreams || !$this->signals->isEmpty()) { + // The only possible event is stream activity, so wait forever ... + } elseif ($hasStreams) { $timeout = null; // There's nothing left to do ... @@ -220,7 +285,7 @@ public function stop() /** * Wait/check for stream activity, or until the next timer is due. * - * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever. + * @param int|null $timeout Activity timeout in microseconds, or null to wait forever. */ private function waitForStreamActivity($timeout) { @@ -260,9 +325,9 @@ private function waitForStreamActivity($timeout) * * @param array &$read An array of read streams to select upon. * @param array &$write An array of write streams to select upon. - * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever. + * @param int|null $timeout Activity timeout in microseconds, or null to wait forever. * - * @return integer|false The total number of streams that are ready for read/write. + * @return int|false The total number of streams that are ready for read/write. * Can return false if stream_select() is interrupted by a signal. */ private function streamSelect(array &$read, array &$write, $timeout) diff --git a/src/Timer/Timers.php b/src/Timer/Timers.php index 1d4ac9b7..93108432 100644 --- a/src/Timer/Timers.php +++ b/src/Timer/Timers.php @@ -61,7 +61,12 @@ public function getFirst() public function isEmpty() { - return \count($this->timers) === 0; + return $this->count() === 0; + } + + public function count() + { + return \count($this->timers); } public function tick() diff --git a/tests/AbstractLoopTest.php b/tests/AbstractLoopTest.php index 3d844382..ae0e36e8 100644 --- a/tests/AbstractLoopTest.php +++ b/tests/AbstractLoopTest.php @@ -2,10 +2,12 @@ namespace React\Tests\EventLoop; +use React\EventLoop\Timer\Timer; + abstract class AbstractLoopTest extends TestCase { /** - * @var \React\EventLoop\LoopInterface + * @var \React\EventLoop\ExtLoopInterface */ protected $loop; @@ -491,6 +493,10 @@ function () { public function testRemoveSignalNotRegisteredIsNoOp() { + if (!defined('SIGINT')) { + return $this->markTestSkipped('Signal test skipped because "SIGINT" is not defined.'); + } + $this->loop->removeSignal(SIGINT, function () { }); $this->assertTrue(true); } @@ -498,7 +504,7 @@ public function testRemoveSignalNotRegisteredIsNoOp() public function testSignal() { if (!function_exists('posix_kill') || !function_exists('posix_getpid')) { - $this->markTestSkipped('Signal test skipped because functions "posix_kill" and "posix_getpid" are missing.'); + return $this->markTestSkipped('Signal test skipped because functions "posix_kill" and "posix_getpid" are missing.'); } $called = false; @@ -590,6 +596,144 @@ public function testTimerIntervalCanBeFarInFuture() $this->assertRunFasterThan($this->tickTimeout); } + /** + * @expectedException InvalidArgumentException + */ + public function testReferenceUnknownTimer() + { + $timer = new Timer(0.1, function () { }, false); + $this->loop->reference($timer); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testDereferenceUnknownTimer() + { + $timer = new Timer(0.1, function () { }, false); + $this->loop->dereference($timer); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testReferenceUnknownStream() + { + list ($stream) = $this->createSocketPair(); + $this->loop->reference($stream); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testDereferenceUnknownStream() + { + list ($stream) = $this->createSocketPair(); + $this->loop->dereference($stream); + } + + public function testDereferenceTimer() + { + $timer = $this->loop->addTimer(0.01, $this->expectCallableNever()); + $this->loop->dereference($timer); + + $this->assertRunFasterThan(0.01); + } + + public function testDereferenceReferenceTimer() + { + $timer = $this->loop->addTimer(0.010001, $this->expectCallableOnce()); + $this->loop->dereference($timer); + $this->loop->reference($timer); + + $this->assertRunSlowerThan(0.01); + } + + public function testDereferenceTimerPlusNormalTimer() + { + $timer = $this->loop->addTimer(0.1, $this->expectCallableNever()); + $this->loop->dereference($timer); + + $this->loop->addTimer(0.03, $this->expectCallableOnce()); + $this->assertRunSlowerThan($this->tickTimeout); + } + + public function testDereferenceStreamInput() + { + list ($input) = $this->createSocketPair(); + + $this->loop->addWriteStream($input, $this->expectCallableNever()); + $this->loop->dereference($input); + + $this->assertRunFasterThan(0.01); + } + + public function testDereferenceReferenceStreamInput() + { + list ($input) = $this->createSocketPair(); + $call = $this->expectCallableOnce(); + $loop = $this->loop; + + $this->loop->addWriteStream($input, function () use (&$call, $input, $loop) { + $call && ($call() || $call = null); + $loop->removeWriteStream($input); + }); + $this->loop->dereference($input); + $this->loop->reference($input); + + $this->assertRunFasterThan(0.01); + } + + public function testDereferenceStreamOutput() + { + list ($input, $output) = $this->createSocketPair(); + + $this->loop->addReadStream($output, $this->expectCallableNever()); + $this->loop->dereference($output); + + $this->assertRunFasterThan(0.01); + } + + public function testDereferenceReferenceStreamOutput() + { + list ($input, $output) = $this->createSocketPair(); + $call = $this->expectCallableOnce(); + $loop = $this->loop; + + $this->loop->addReadStream($output, function () use (&$call, $output, $loop) { + $call && ($call() || $call = null); + $loop->removeReadStream($output); + }); + $this->loop->dereference($output); + $this->loop->reference($output); + + \fwrite($input, 'hello_world'); + $this->assertRunFasterThan(0.01); + } + + public function testDereferenceStreamInputPlusNormalStream() + { + list ($input, $output) = $this->createSocketPair(); + $read = $this->expectCallableOnce(); + $write = $this->expectCallableOnce(); + $loop = $this->loop; + + $this->loop->addWriteStream($input, function () use ($input, &$read) { + $read && ($read() || $read = null); + usleep(10); // we would be too fast for the timing + fwrite($input, 'hello_world'); + }); + $this->loop->dereference($input); + + $this->loop->addReadStream($output, function () use ($input, $output, $loop, &$write) { + $write && ($write() || $write = null); + $loop->removeWriteStream($input); + $loop->removeReadStream($output); + }); + + $this->assertRunSlowerThan(0.00001); + } + private function assertRunSlowerThan($minInterval) { $start = microtime(true);