Skip to content

Commit 7d4c27f

Browse files
committed
MongoDB Subscription Consumer feature
1 parent e9d8364 commit 7d4c27f

8 files changed

+462
-18
lines changed

MongodbConsumer.php

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public function reject(Message $message, bool $requeue = false): void
115115
}
116116
}
117117

118-
protected function receiveMessage(): ?MongodbMessage
118+
private function receiveMessage(): ?MongodbMessage
119119
{
120120
$now = time();
121121
$collection = $this->context->getCollection();
@@ -137,23 +137,9 @@ protected function receiveMessage(): ?MongodbMessage
137137
return null;
138138
}
139139
if (empty($message['time_to_live']) || $message['time_to_live'] > time()) {
140-
return $this->convertMessage($message);
140+
return MongodbMessage::fromArrayDbResult($message);
141141
}
142142

143143
return null;
144144
}
145-
146-
protected function convertMessage(array $mongodbMessage): MongodbMessage
147-
{
148-
$properties = JSON::decode($mongodbMessage['properties']);
149-
$headers = JSON::decode($mongodbMessage['headers']);
150-
151-
$message = $this->context->createMessage($mongodbMessage['body'], $properties, $headers);
152-
$message->setId((string) $mongodbMessage['_id']);
153-
$message->setPriority((int) $mongodbMessage['priority']);
154-
$message->setRedelivered((bool) $mongodbMessage['redelivered']);
155-
$message->setPublishedAt((int) $mongodbMessage['published_at']);
156-
157-
return $message;
158-
}
159145
}

MongodbContext.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
use Interop\Queue\Context;
99
use Interop\Queue\Destination;
1010
use Interop\Queue\Exception\InvalidDestinationException;
11-
use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
1211
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
1312
use Interop\Queue\Message;
1413
use Interop\Queue\Producer;
@@ -107,7 +106,7 @@ public function close(): void
107106

108107
public function createSubscriptionConsumer(): SubscriptionConsumer
109108
{
110-
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
109+
return new MongodbSubscriptionConsumer($this);
111110
}
112111

113112
/**

MongodbMessage.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,22 @@ public function __construct(string $body = '', array $properties = [], array $he
6565
$this->redelivered = false;
6666
}
6767

68+
public static function fromArrayDbResult(array $arrayResult): self
69+
{
70+
$message = new self(
71+
$arrayResult['body'],
72+
JSON::decode($arrayResult['properties']),
73+
JSON::decode($arrayResult['headers'])
74+
);
75+
76+
$message->setId((string) $arrayResult['_id']);
77+
$message->setPriority((int) $arrayResult['priority']);
78+
$message->setRedelivered((bool) $arrayResult['redelivered']);
79+
$message->setPublishedAt((int) $arrayResult['published_at']);
80+
81+
return $message;
82+
}
83+
6884
public function setId(string $id = null): void
6985
{
7086
$this->id = $id;

MongodbSubscriptionConsumer.php

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Mongodb;
6+
7+
use Interop\Queue\Consumer;
8+
use Interop\Queue\SubscriptionConsumer;
9+
10+
class MongodbSubscriptionConsumer implements SubscriptionConsumer
11+
{
12+
/**
13+
* @var MongodbContext
14+
*/
15+
private $context;
16+
17+
/**
18+
* an item contains an array: [MongodbConsumer $consumer, callable $callback];.
19+
*
20+
* @var array
21+
*/
22+
private $subscribers;
23+
24+
/**
25+
* @param MongodbContext $context
26+
*/
27+
public function __construct(MongodbContext $context)
28+
{
29+
$this->context = $context;
30+
$this->subscribers = [];
31+
}
32+
33+
public function consume(int $timeout = 0): void
34+
{
35+
if (empty($this->subscribers)) {
36+
throw new \LogicException('No subscribers');
37+
}
38+
39+
$timeout = (int) ceil($timeout / 1000);
40+
$endAt = time() + $timeout;
41+
42+
$queueNames = [];
43+
foreach (array_keys($this->subscribers) as $queueName) {
44+
$queueNames[$queueName] = $queueName;
45+
}
46+
47+
$currentQueueNames = [];
48+
while (true) {
49+
if (empty($currentQueueNames)) {
50+
$currentQueueNames = $queueNames;
51+
}
52+
53+
$result = $this->context->getCollection()->findOneAndDelete(
54+
[
55+
'queue' => ['$in' => array_keys($currentQueueNames)],
56+
'$or' => [
57+
['delayed_until' => ['$exists' => false]],
58+
['delayed_until' => ['$lte' => time()]],
59+
],
60+
],
61+
[
62+
'sort' => ['priority' => -1, 'published_at' => 1],
63+
'typeMap' => ['root' => 'array', 'document' => 'array'],
64+
]
65+
);
66+
67+
if ($result) {
68+
list($consumer, $callback) = $this->subscribers[$result['queue']];
69+
70+
$message = MongodbMessage::fromArrayDbResult($result);
71+
72+
if (false === call_user_func($callback, $message, $consumer)) {
73+
return;
74+
}
75+
76+
unset($currentQueueNames[$result['queue']]);
77+
} else {
78+
$currentQueueNames = [];
79+
}
80+
81+
if ($timeout && microtime(true) >= $endAt) {
82+
return;
83+
}
84+
}
85+
}
86+
87+
/**
88+
* @param MongodbConsumer $consumer
89+
*/
90+
public function subscribe(Consumer $consumer, callable $callback): void
91+
{
92+
if (false == $consumer instanceof MongodbConsumer) {
93+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, get_class($consumer)));
94+
}
95+
96+
$queueName = $consumer->getQueue()->getQueueName();
97+
if (array_key_exists($queueName, $this->subscribers)) {
98+
if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) {
99+
return;
100+
}
101+
102+
throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName));
103+
}
104+
105+
$this->subscribers[$queueName] = [$consumer, $callback];
106+
}
107+
108+
/**
109+
* @param MongodbConsumer $consumer
110+
*/
111+
public function unsubscribe(Consumer $consumer): void
112+
{
113+
if (false == $consumer instanceof MongodbConsumer) {
114+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, get_class($consumer)));
115+
}
116+
117+
$queueName = $consumer->getQueue()->getQueueName();
118+
119+
if (false == array_key_exists($queueName, $this->subscribers)) {
120+
return;
121+
}
122+
123+
if ($this->subscribers[$queueName][0] !== $consumer) {
124+
return;
125+
}
126+
127+
unset($this->subscribers[$queueName]);
128+
}
129+
130+
public function unsubscribeAll(): void
131+
{
132+
$this->subscribers = [];
133+
}
134+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Mongodb\Tests;
6+
7+
use Enqueue\Mongodb\MongodbConsumer;
8+
use Enqueue\Mongodb\MongodbContext;
9+
use Enqueue\Mongodb\MongodbSubscriptionConsumer;
10+
use Interop\Queue\Consumer;
11+
use Interop\Queue\Queue;
12+
use Interop\Queue\SubscriptionConsumer;
13+
use PHPUnit\Framework\TestCase;
14+
15+
class MongodbSubscriptionConsumerTest extends TestCase
16+
{
17+
public function testShouldImplementSubscriptionConsumerInterface()
18+
{
19+
$rc = new \ReflectionClass(MongodbSubscriptionConsumer::class);
20+
21+
$this->assertTrue($rc->implementsInterface(SubscriptionConsumer::class));
22+
}
23+
24+
public function testCouldBeConstructedWithMongodbContextAsFirstArgument()
25+
{
26+
new MongodbSubscriptionConsumer($this->createMongodbContextMock());
27+
}
28+
29+
public function testShouldAddConsumerAndCallbackToSubscribersPropertyOnSubscribe()
30+
{
31+
$subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
32+
33+
$fooCallback = function () {};
34+
$fooConsumer = $this->createConsumerStub('foo_queue');
35+
36+
$barCallback = function () {};
37+
$barConsumer = $this->createConsumerStub('bar_queue');
38+
39+
$subscriptionConsumer->subscribe($fooConsumer, $fooCallback);
40+
$subscriptionConsumer->subscribe($barConsumer, $barCallback);
41+
42+
$this->assertAttributeSame([
43+
'foo_queue' => [$fooConsumer, $fooCallback],
44+
'bar_queue' => [$barConsumer, $barCallback],
45+
], 'subscribers', $subscriptionConsumer);
46+
}
47+
48+
public function testThrowsIfTrySubscribeAnotherConsumerToAlreadySubscribedQueue()
49+
{
50+
$subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
51+
52+
$fooCallback = function () {};
53+
$fooConsumer = $this->createConsumerStub('foo_queue');
54+
55+
$barCallback = function () {};
56+
$barConsumer = $this->createConsumerStub('foo_queue');
57+
58+
$subscriptionConsumer->subscribe($fooConsumer, $fooCallback);
59+
60+
$this->expectException(\InvalidArgumentException::class);
61+
$this->expectExceptionMessage('There is a consumer subscribed to queue: "foo_queue"');
62+
$subscriptionConsumer->subscribe($barConsumer, $barCallback);
63+
}
64+
65+
public function testShouldAllowSubscribeSameConsumerAndCallbackSecondTime()
66+
{
67+
$subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
68+
69+
$fooCallback = function () {};
70+
$fooConsumer = $this->createConsumerStub('foo_queue');
71+
72+
$subscriptionConsumer->subscribe($fooConsumer, $fooCallback);
73+
$subscriptionConsumer->subscribe($fooConsumer, $fooCallback);
74+
}
75+
76+
public function testShouldRemoveSubscribedConsumerOnUnsubscribeCall()
77+
{
78+
$subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
79+
80+
$fooConsumer = $this->createConsumerStub('foo_queue');
81+
$barConsumer = $this->createConsumerStub('bar_queue');
82+
83+
$subscriptionConsumer->subscribe($fooConsumer, function () {});
84+
$subscriptionConsumer->subscribe($barConsumer, function () {});
85+
86+
// guard
87+
$this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer);
88+
89+
$subscriptionConsumer->unsubscribe($fooConsumer);
90+
91+
$this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
92+
}
93+
94+
public function testShouldDoNothingIfTryUnsubscribeNotSubscribedQueueName()
95+
{
96+
$subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
97+
98+
$subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {});
99+
100+
// guard
101+
$this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
102+
103+
$subscriptionConsumer->unsubscribe($this->createConsumerStub('bar_queue'));
104+
105+
$this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
106+
}
107+
108+
public function testShouldDoNothingIfTryUnsubscribeNotSubscribedConsumer()
109+
{
110+
$subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
111+
112+
$subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {});
113+
114+
// guard
115+
$this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
116+
117+
$subscriptionConsumer->unsubscribe($this->createConsumerStub('foo_queue'));
118+
119+
$this->assertAttributeCount(1, 'subscribers', $subscriptionConsumer);
120+
}
121+
122+
public function testShouldRemoveAllSubscriberOnUnsubscribeAllCall()
123+
{
124+
$subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
125+
126+
$subscriptionConsumer->subscribe($this->createConsumerStub('foo_queue'), function () {});
127+
$subscriptionConsumer->subscribe($this->createConsumerStub('bar_queue'), function () {});
128+
129+
// guard
130+
$this->assertAttributeCount(2, 'subscribers', $subscriptionConsumer);
131+
132+
$subscriptionConsumer->unsubscribeAll();
133+
134+
$this->assertAttributeCount(0, 'subscribers', $subscriptionConsumer);
135+
}
136+
137+
public function testThrowsIfTryConsumeWithoutSubscribers()
138+
{
139+
$subscriptionConsumer = new MongodbSubscriptionConsumer($this->createMongodbContextMock());
140+
141+
$this->expectException(\LogicException::class);
142+
$this->expectExceptionMessage('No subscribers');
143+
144+
$subscriptionConsumer->consume();
145+
}
146+
147+
/**
148+
* @return MongodbContext|\PHPUnit_Framework_MockObject_MockObject
149+
*/
150+
private function createMongodbContextMock()
151+
{
152+
return $this->createMock(MongodbContext::class);
153+
}
154+
155+
/**
156+
* @param null|mixed $queueName
157+
*
158+
* @return Consumer|\PHPUnit_Framework_MockObject_MockObject
159+
*/
160+
private function createConsumerStub($queueName = null)
161+
{
162+
$queueMock = $this->createMock(Queue::class);
163+
$queueMock
164+
->expects($this->any())
165+
->method('getQueueName')
166+
->willReturn($queueName);
167+
168+
$consumerMock = $this->createMock(MongodbConsumer::class);
169+
$consumerMock
170+
->expects($this->any())
171+
->method('getQueue')
172+
->willReturn($queueMock)
173+
;
174+
175+
return $consumerMock;
176+
}
177+
}

0 commit comments

Comments
 (0)