Skip to content

Commit bf7a6ea

Browse files
committed
Migrate mongodb transport.
1 parent 16816ec commit bf7a6ea

11 files changed

+166
-281
lines changed

MongodbConnectionFactory.php

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Mongodb;
44

55
use Interop\Queue\PsrConnectionFactory;
6+
use Interop\Queue\PsrContext;
67
use MongoDB\Client;
78

89
class MongodbConnectionFactory implements PsrConnectionFactory
@@ -48,14 +49,17 @@ public function __construct($config = 'mongodb:')
4849
$this->config = $config;
4950
}
5051

51-
public function createContext()
52+
/**
53+
* @return MongodbContext
54+
*/
55+
public function createContext(): PsrContext
5256
{
5357
$client = new Client($this->config['dsn']);
5458

5559
return new MongodbContext($client, $this->config);
5660
}
5761

58-
public static function parseDsn($dsn)
62+
public static function parseDsn(string $dsn): array
5963
{
6064
$parsedUrl = parse_url($dsn);
6165
if (false === $parsedUrl) {

MongodbConsumer.php

Lines changed: 22 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Interop\Queue\InvalidMessageException;
66
use Interop\Queue\PsrConsumer;
77
use Interop\Queue\PsrMessage;
8+
use Interop\Queue\PsrQueue;
89

910
class MongodbConsumer implements PsrConsumer
1011
{
@@ -21,54 +22,44 @@ class MongodbConsumer implements PsrConsumer
2122
/**
2223
* @var int microseconds
2324
*/
24-
private $pollingInterval = 1000000;
25+
private $pollingInterval;
2526

26-
/**
27-
* @param MongodbContext $context
28-
* @param MongodbDestination $queue
29-
*/
3027
public function __construct(MongodbContext $context, MongodbDestination $queue)
3128
{
3229
$this->context = $context;
3330
$this->queue = $queue;
31+
32+
$this->pollingInterval = 1000;
3433
}
3534

3635
/**
3736
* Set polling interval in milliseconds.
38-
*
39-
* @param int $msec
4037
*/
41-
public function setPollingInterval($msec)
38+
public function setPollingInterval(int $msec): void
4239
{
43-
$this->pollingInterval = $msec * 1000;
40+
$this->pollingInterval = $msec;
4441
}
4542

4643
/**
4744
* Get polling interval in milliseconds.
48-
*
49-
* @return int
5045
*/
51-
public function getPollingInterval()
46+
public function getPollingInterval(): int
5247
{
53-
return (int) $this->pollingInterval / 1000;
48+
return $this->pollingInterval;
5449
}
5550

5651
/**
57-
* {@inheritdoc}
58-
*
5952
* @return MongodbDestination
6053
*/
61-
public function getQueue()
54+
public function getQueue(): PsrQueue
6255
{
6356
return $this->queue;
6457
}
6558

6659
/**
67-
* {@inheritdoc}
68-
*
69-
* @return MongodbMessage|null
60+
* @return MongodbMessage
7061
*/
71-
public function receive($timeout = 0)
62+
public function receive(int $timeout = 0): ?PsrMessage
7263
{
7364
$timeout /= 1000;
7465
$startAt = microtime(true);
@@ -81,43 +72,37 @@ public function receive($timeout = 0)
8172
}
8273

8374
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
84-
return;
75+
return null;
8576
}
8677

87-
usleep($this->pollingInterval);
78+
usleep($this->pollingInterval * 1000);
8879

8980
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
90-
return;
81+
return null;
9182
}
9283
}
9384
}
9485

9586
/**
96-
* {@inheritdoc}
97-
*
98-
* @return MongodbMessage|null
87+
* @return MongodbMessage
9988
*/
100-
public function receiveNoWait()
89+
public function receiveNoWait(): ?PsrMessage
10190
{
10291
return $this->receiveMessage();
10392
}
10493

10594
/**
106-
* {@inheritdoc}
107-
*
10895
* @param MongodbMessage $message
10996
*/
110-
public function acknowledge(PsrMessage $message)
97+
public function acknowledge(PsrMessage $message): void
11198
{
11299
// does nothing
113100
}
114101

115102
/**
116-
* {@inheritdoc}
117-
*
118103
* @param MongodbMessage $message
119104
*/
120-
public function reject(PsrMessage $message, $requeue = false)
105+
public function reject(PsrMessage $message, bool $requeue = false): void
121106
{
122107
InvalidMessageException::assertMessageInstanceOf($message, MongodbMessage::class);
123108

@@ -128,10 +113,7 @@ public function reject(PsrMessage $message, $requeue = false)
128113
}
129114
}
130115

131-
/**
132-
* @return MongodbMessage|null
133-
*/
134-
protected function receiveMessage()
116+
protected function receiveMessage(): ?MongodbMessage
135117
{
136118
$now = time();
137119
$collection = $this->context->getCollection();
@@ -155,14 +137,11 @@ protected function receiveMessage()
155137
if (empty($message['time_to_live']) || $message['time_to_live'] > time()) {
156138
return $this->convertMessage($message);
157139
}
140+
141+
return null;
158142
}
159143

160-
/**
161-
* @param array $dbalMessage
162-
*
163-
* @return MongodbMessage
164-
*/
165-
protected function convertMessage(array $mongodbMessage)
144+
protected function convertMessage(array $mongodbMessage): MongodbMessage
166145
{
167146
$properties = JSON::decode($mongodbMessage['properties']);
168147
$headers = JSON::decode($mongodbMessage['headers']);

MongodbContext.php

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,18 @@
33
namespace Enqueue\Mongodb;
44

55
use Interop\Queue\InvalidDestinationException;
6+
use Interop\Queue\PsrConsumer;
67
use Interop\Queue\PsrContext;
78
use Interop\Queue\PsrDestination;
9+
use Interop\Queue\PsrMessage;
10+
use Interop\Queue\PsrProducer;
11+
use Interop\Queue\PsrQueue;
12+
use Interop\Queue\PsrSubscriptionConsumer;
13+
use Interop\Queue\PsrTopic;
14+
use Interop\Queue\SubscriptionConsumerNotSupportedException;
15+
use Interop\Queue\TemporaryQueueNotSupportedException;
816
use MongoDB\Client;
17+
use MongoDB\Collection;
918

1019
class MongodbContext implements PsrContext
1120
{
@@ -30,7 +39,10 @@ public function __construct($client, array $config = [])
3039
$this->client = $client;
3140
}
3241

33-
public function createMessage($body = '', array $properties = [], array $headers = [])
42+
/**
43+
* @return MongodbMessage
44+
*/
45+
public function createMessage(string $body = '', array $properties = [], array $headers = []): PsrMessage
3446
{
3547
$message = new MongodbMessage();
3648
$message->setBody($body);
@@ -40,27 +52,41 @@ public function createMessage($body = '', array $properties = [], array $headers
4052
return $message;
4153
}
4254

43-
public function createTopic($name)
55+
/**
56+
* @return MongodbDestination
57+
*/
58+
public function createTopic(string $name): PsrTopic
4459
{
4560
return new MongodbDestination($name);
4661
}
4762

48-
public function createQueue($queueName)
63+
/**
64+
* @return MongodbDestination
65+
*/
66+
public function createQueue(string $queueName): PsrQueue
4967
{
5068
return new MongodbDestination($queueName);
5169
}
5270

53-
public function createTemporaryQueue()
71+
public function createTemporaryQueue(): PsrQueue
5472
{
55-
throw new \BadMethodCallException('Mongodb transport does not support temporary queues');
73+
throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt();
5674
}
5775

58-
public function createProducer()
76+
/**
77+
* @return MongodbProducer
78+
*/
79+
public function createProducer(): PsrProducer
5980
{
6081
return new MongodbProducer($this);
6182
}
6283

63-
public function createConsumer(PsrDestination $destination)
84+
/**
85+
* @param MongodbDestination $destination
86+
*
87+
* @return MongodbConsumer
88+
*/
89+
public function createConsumer(PsrDestination $destination): PsrConsumer
6490
{
6591
InvalidDestinationException::assertDestinationInstanceOf($destination, MongodbDestination::class);
6692

@@ -73,35 +99,43 @@ public function createConsumer(PsrDestination $destination)
7399
return $consumer;
74100
}
75101

76-
public function close()
102+
public function close(): void
103+
{
104+
}
105+
106+
public function createSubscriptionConsumer(): PsrSubscriptionConsumer
77107
{
78-
// TODO: Implement close() method.
108+
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
79109
}
80110

81-
public function getCollection()
111+
/**
112+
* @param MongodbDestination $queue
113+
*/
114+
public function purgeQueue(PsrQueue $queue): void
115+
{
116+
$this->getCollection()->deleteMany([
117+
'queue' => $queue->getQueueName(),
118+
]);
119+
}
120+
121+
public function getCollection(): Collection
82122
{
83123
return $this->client
84124
->selectDatabase($this->config['dbname'])
85125
->selectCollection($this->config['collection_name']);
86126
}
87127

88-
/**
89-
* @return Client
90-
*/
91-
public function getClient()
128+
public function getClient(): Client
92129
{
93130
return $this->client;
94131
}
95132

96-
/**
97-
* @return array
98-
*/
99-
public function getConfig()
133+
public function getConfig(): array
100134
{
101135
return $this->config;
102136
}
103137

104-
public function createCollection()
138+
public function createCollection(): void
105139
{
106140
$collection = $this->getCollection();
107141
$collection->createIndex(['priority' => -1, 'published_at' => 1], ['name' => 'enqueue_priority']);

MongodbDestination.php

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,22 @@ class MongodbDestination implements PsrTopic, PsrQueue
1212
*/
1313
private $destinationName;
1414

15-
/**
16-
* @param string $name
17-
*/
18-
public function __construct($name)
15+
public function __construct(string $name)
1916
{
2017
$this->destinationName = $name;
2118
}
2219

23-
/**
24-
* {@inheritdoc}
25-
*/
26-
public function getQueueName()
20+
public function getQueueName(): string
2721
{
2822
return $this->destinationName;
2923
}
3024

31-
/**
32-
* Alias for getQueueName()
33-
* {@inheritdoc}
34-
*/
35-
public function getName()
25+
public function getTopicName(): string
3626
{
37-
return $this->getQueueName();
27+
return $this->destinationName;
3828
}
3929

40-
/**
41-
* {@inheritdoc}
42-
*/
43-
public function getTopicName()
30+
public function getName(): string
4431
{
4532
return $this->destinationName;
4633
}

0 commit comments

Comments
 (0)