Skip to content

Dereferencing/referencing of streams and timers #180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 72 additions & 8 deletions src/ExtEvLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* @see http://php.net/manual/en/book.ev.php
* @see https://bitbucket.org/osmanov/pecl-ev/overview
*/
class ExtEvLoop implements LoopInterface
class ExtEvLoop implements ExtLoopInterface
{
/**
* @var EvLoop
Expand All @@ -47,6 +47,11 @@ class ExtEvLoop implements LoopInterface
*/
private $writeStreams = array();

/**
* @var bool[]
*/
private $allStreams = array();

/**
* @var bool
*/
Expand All @@ -62,6 +67,10 @@ class ExtEvLoop implements LoopInterface
*/
private $signalEvents = array();

private $dereferences = array();
private $derefTimers = 0;
private $derefStreams = 0;

public function __construct()
{
$this->loop = new EvLoop();
Expand All @@ -81,6 +90,7 @@ public function addReadStream($stream, $listener)
$callback = $this->getStreamListenerClosure($stream, $listener);
$event = $this->loop->io($stream, Ev::READ, $callback);
$this->readStreams[$key] = $event;
$this->allStreams[$key] = true;
}

/**
Expand All @@ -107,6 +117,7 @@ public function addWriteStream($stream, $listener)
$callback = $this->getStreamListenerClosure($stream, $listener);
$event = $this->loop->io($stream, Ev::WRITE, $callback);
$this->writeStreams[$key] = $event;
$this->allStreams[$key] = true;
}

public function removeReadStream($stream)
Expand All @@ -118,7 +129,7 @@ public function removeReadStream($stream)
}

$this->readStreams[$key]->stop();
unset($this->readStreams[$key]);
unset($this->readStreams[$key], $this->allStreams[$key]);
}

public function removeWriteStream($stream)
Expand All @@ -130,7 +141,7 @@ public function removeWriteStream($stream)
}

$this->writeStreams[$key]->stop();
unset($this->writeStreams[$key]);
unset($this->writeStreams[$key], $this->allStreams[$key]);
}

public function addTimer($interval, $callback)
Expand Down Expand Up @@ -183,6 +194,60 @@ public function futureTick($listener)
$this->futureTickQueue->add($listener);
}

public function reference($streamOrTimer)
{
if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) {
if (!$this->timers->contains($streamOrTimer)) {
throw new \InvalidArgumentException('Given timer is not part of this loop');
}

$key = \spl_object_hash($streamOrTimer);

if (isset($this->dereferences[$key])) {
unset($this->dereferences[$key]);
$this->derefTimers--;
}
} else {
$key = (int) $streamOrTimer;

if (!isset($this->allStreams[$key])) {
throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop');
}

if (isset($this->dereferences[$key])) {
unset($this->dereferences[$key]);
$this->derefStreams--;
}
}
}

public function dereference($streamOrTimer)
{
if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) {
if (!$this->timers->contains($streamOrTimer)) {
throw new \InvalidArgumentException('Given timer is not part of this loop');
}

$key = \spl_object_hash($streamOrTimer);

if (!isset($this->dereferences[$key])) {
$this->dereferences[$key] = true;
$this->derefTimers++;
}
} else {
$key = (int) $streamOrTimer;

if (!isset($this->allStreams[$key])) {
throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop');
}

if (!isset($this->dereferences[$key])) {
$this->dereferences[$key] = true;
$this->derefStreams++;
}
}
}

public function run()
{
$this->running = true;
Expand All @@ -191,16 +256,15 @@ public function run()
$this->futureTickQueue->tick();

$hasPendingCallbacks = !$this->futureTickQueue->isEmpty();
$hasStreams = \count($this->allStreams) > $this->derefStreams;
$hasTimers = $this->timers->count() > $this->derefTimers;

$wasJustStopped = !$this->running;
$nothingLeftToDo = !$this->readStreams
&& !$this->writeStreams
&& !$this->timers->count()
&& $this->signals->isEmpty();

$flags = Ev::RUN_ONCE;
if ($wasJustStopped || $hasPendingCallbacks) {
$flags |= Ev::RUN_NOWAIT;
} elseif ($nothingLeftToDo) {
} elseif (!$hasStreams && !$hasTimers) {
break;
}

Expand Down
74 changes: 70 additions & 4 deletions src/ExtEventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*
* @link https://pecl.php.net/package/event
*/
final class ExtEventLoop implements LoopInterface
final class ExtEventLoop implements ExtLoopInterface
{
private $eventBase;
private $futureTickQueue;
Expand All @@ -31,12 +31,17 @@ final class ExtEventLoop implements LoopInterface
private $writeEvents = array();
private $readListeners = array();
private $writeListeners = array();
private $allStreams = array();
private $readRefs = array();
private $writeRefs = array();
private $running;
private $signals;
private $signalEvents = array();

private $dereferences = array();
private $derefTimers = 0;
private $derefStreams = 0;

public function __construct()
{
if (!\class_exists('EventBase', false)) {
Expand Down Expand Up @@ -66,6 +71,7 @@ public function addReadStream($stream, $listener)
$event->add();
$this->readEvents[$key] = $event;
$this->readListeners[$key] = $listener;
$this->allStreams[$key] = true;

// ext-event does not increase refcount on stream resources for PHP 7+
// manually keep track of stream resource to prevent premature garbage collection
Expand All @@ -85,6 +91,7 @@ public function addWriteStream($stream, $listener)
$event->add();
$this->writeEvents[$key] = $event;
$this->writeListeners[$key] = $listener;
$this->allStreams[$key] = true;

// ext-event does not increase refcount on stream resources for PHP 7+
// manually keep track of stream resource to prevent premature garbage collection
Expand All @@ -102,7 +109,8 @@ public function removeReadStream($stream)
unset(
$this->readEvents[$key],
$this->readListeners[$key],
$this->readRefs[$key]
$this->readRefs[$key],
$this->allStreams[$key]
);
}
}
Expand All @@ -116,7 +124,8 @@ public function removeWriteStream($stream)
unset(
$this->writeEvents[$key],
$this->writeListeners[$key],
$this->writeRefs[$key]
$this->writeRefs[$key],
$this->allStreams[$key]
);
}
}
Expand Down Expand Up @@ -172,17 +181,74 @@ public function removeSignal($signal, $listener)
}
}

public function reference($streamOrTimer)
{
if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) {
if (!$this->timerEvents->contains($streamOrTimer)) {
throw new \InvalidArgumentException('Given timer is not part of this loop');
}

$key = \spl_object_hash($streamOrTimer);

if (isset($this->dereferences[$key])) {
unset($this->dereferences[$key]);
$this->derefTimers--;
}
} else {
$key = (int) $streamOrTimer;

if (!isset($this->allStreams[$key])) {
throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop');
}

if (isset($this->dereferences[$key])) {
unset($this->dereferences[$key]);
$this->derefStreams--;
}
}
}

public function dereference($streamOrTimer)
{
if ($streamOrTimer instanceof \React\EventLoop\TimerInterface) {
if (!$this->timerEvents->contains($streamOrTimer)) {
throw new \InvalidArgumentException('Given timer is not part of this loop');
}

$key = \spl_object_hash($streamOrTimer);

if (!isset($this->dereferences[$key])) {
$this->dereferences[$key] = true;
$this->derefTimers++;
}
} else {
$key = (int) $streamOrTimer;

if (!isset($this->allStreams[$key])) {
throw new \InvalidArgumentException('Given stream is not part of a read or write session of this loop');
}

if (!isset($this->dereferences[$key])) {
$this->dereferences[$key] = true;
$this->derefStreams++;
}
}
}

public function run()
{
$this->running = true;

while ($this->running) {
$this->futureTickQueue->tick();

$hasStreams = \count($this->allStreams) > $this->derefStreams;
$hasTimers = $this->timerEvents->count() > $this->derefTimers;

$flags = EventBase::LOOP_ONCE;
if (!$this->running || !$this->futureTickQueue->isEmpty()) {
$flags |= EventBase::LOOP_NONBLOCK;
} elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count() && $this->signals->isEmpty()) {
} elseif (!$hasStreams && !$hasTimers && $this->signals->isEmpty()) {
break;
}

Expand Down
Loading