Skip to content

Commit b1e8117

Browse files
author
Mathieu Lemoine
committed
Reduced dependency to voryx/Thruway
1 parent 374323f commit b1e8117

22 files changed

+401
-194
lines changed

Consumption/Extension/DoctrinePingConnectionExtension.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@
22

33
namespace Enqueue\Bundle\Consumption\Extension;
44

5+
use Doctrine\Common\Persistence\ManagerRegistry;
56
use Doctrine\DBAL\Connection;
67
use Enqueue\Consumption\Context\MessageReceived;
78
use Enqueue\Consumption\MessageReceivedExtensionInterface;
8-
use Symfony\Bridge\Doctrine\RegistryInterface;
99

1010
class DoctrinePingConnectionExtension implements MessageReceivedExtensionInterface
1111
{
1212
/**
13-
* @var RegistryInterface
13+
* @var ManagerRegistry
1414
*/
1515
protected $registry;
1616

1717
/**
18-
* @param RegistryInterface $registry
18+
* @param ManagerRegistry $registry
1919
*/
20-
public function __construct(RegistryInterface $registry)
20+
public function __construct(ManagerRegistry $registry)
2121
{
2222
$this->registry = $registry;
2323
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Profiler;
4+
5+
use Enqueue\Client\MessagePriority;
6+
use Enqueue\Client\ProducerInterface;
7+
use Enqueue\Client\TraceableProducer;
8+
use Enqueue\Util\JSON;
9+
use Symfony\Component\HttpFoundation\Request;
10+
use Symfony\Component\HttpFoundation\Response;
11+
use Symfony\Component\HttpKernel\DataCollector\DataCollector;
12+
13+
abstract class AbstractMessageQueueCollector extends DataCollector
14+
{
15+
/**
16+
* @var ProducerInterface
17+
*/
18+
protected $producers;
19+
20+
public function addProducer(string $name, ProducerInterface $producer): void
21+
{
22+
$this->producers[$name] = $producer;
23+
}
24+
25+
protected function collectInternal(Request $request, Response $response): void
26+
{
27+
$this->data = [];
28+
29+
foreach ($this->producers as $name => $producer) {
30+
if ($producer instanceof TraceableProducer) {
31+
$this->data[$name] = $producer->getTraces();
32+
}
33+
}
34+
}
35+
36+
public function getCount(): int
37+
{
38+
$count = 0;
39+
foreach ($this->data as $name => $messages) {
40+
$count += count($messages);
41+
}
42+
43+
return $count;
44+
}
45+
46+
/**
47+
* @return array
48+
*/
49+
public function getSentMessages()
50+
{
51+
return $this->data;
52+
}
53+
54+
/**
55+
* @param string $priority
56+
*
57+
* @return string
58+
*/
59+
public function prettyPrintPriority($priority)
60+
{
61+
$map = [
62+
MessagePriority::VERY_LOW => 'very low',
63+
MessagePriority::LOW => 'low',
64+
MessagePriority::NORMAL => 'normal',
65+
MessagePriority::HIGH => 'high',
66+
MessagePriority::VERY_HIGH => 'very high',
67+
];
68+
69+
return isset($map[$priority]) ? $map[$priority] : $priority;
70+
}
71+
72+
/**
73+
* @param mixed $body
74+
*
75+
* @return string
76+
*/
77+
public function ensureString($body)
78+
{
79+
return is_string($body) ? $body : JSON::encode($body);
80+
}
81+
82+
/**
83+
* {@inheritdoc}
84+
*/
85+
public function getName()
86+
{
87+
return 'enqueue.message_queue';
88+
}
89+
90+
/**
91+
* {@inheritdoc}
92+
*/
93+
public function reset()
94+
{
95+
$this->data = [];
96+
}
97+
}

Profiler/MessageQueueCollector.php

Lines changed: 17 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -2,99 +2,30 @@
22

33
namespace Enqueue\Bundle\Profiler;
44

5-
use Enqueue\Client\MessagePriority;
6-
use Enqueue\Client\ProducerInterface;
7-
use Enqueue\Client\TraceableProducer;
8-
use Enqueue\Util\JSON;
95
use Symfony\Component\HttpFoundation\Request;
106
use Symfony\Component\HttpFoundation\Response;
11-
use Symfony\Component\HttpKernel\DataCollector\DataCollector;
7+
use Symfony\Component\HttpKernel\Kernel;
128

13-
class MessageQueueCollector extends DataCollector
14-
{
15-
/**
16-
* @var ProducerInterface
17-
*/
18-
private $producers;
19-
20-
public function addProducer(string $name, ProducerInterface $producer): void
21-
{
22-
$this->producers[$name] = $producer;
23-
}
24-
25-
/**
26-
* {@inheritdoc}
27-
*/
28-
public function collect(Request $request, Response $response, \Exception $exception = null)
9+
if (Kernel::MAJOR_VERSION < 5) {
10+
class MessageQueueCollector extends AbstractMessageQueueCollector
2911
{
30-
$this->data = [];
31-
32-
foreach ($this->producers as $name => $producer) {
33-
if ($producer instanceof TraceableProducer) {
34-
$this->data[$name] = $producer->getTraces();
35-
}
12+
/**
13+
* {@inheritdoc}
14+
*/
15+
public function collect(Request $request, Response $response, \Exception $exception = null)
16+
{
17+
$this->collectInternal($request, $response);
3618
}
3719
}
38-
39-
public function getCount(): int
20+
} else {
21+
class MessageQueueCollector extends AbstractMessageQueueCollector
4022
{
41-
$count = 0;
42-
foreach ($this->data as $name => $messages) {
43-
$count += count($messages);
23+
/**
24+
* {@inheritdoc}
25+
*/
26+
public function collect(Request $request, Response $response, \Throwable $exception = null)
27+
{
28+
$this->collectInternal($request, $response);
4429
}
45-
46-
return $count;
47-
}
48-
49-
/**
50-
* @return array
51-
*/
52-
public function getSentMessages()
53-
{
54-
return $this->data;
55-
}
56-
57-
/**
58-
* @param string $priority
59-
*
60-
* @return string
61-
*/
62-
public function prettyPrintPriority($priority)
63-
{
64-
$map = [
65-
MessagePriority::VERY_LOW => 'very low',
66-
MessagePriority::LOW => 'low',
67-
MessagePriority::NORMAL => 'normal',
68-
MessagePriority::HIGH => 'high',
69-
MessagePriority::VERY_HIGH => 'very high',
70-
];
71-
72-
return isset($map[$priority]) ? $map[$priority] : $priority;
73-
}
74-
75-
/**
76-
* @param mixed $body
77-
*
78-
* @return string
79-
*/
80-
public function ensureString($body)
81-
{
82-
return is_string($body) ? $body : JSON::encode($body);
83-
}
84-
85-
/**
86-
* {@inheritdoc}
87-
*/
88-
public function getName()
89-
{
90-
return 'enqueue.message_queue';
91-
}
92-
93-
/**
94-
* {@inheritdoc}
95-
*/
96-
public function reset()
97-
{
98-
$this->data = [];
9930
}
10031
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Functional\App;
4+
5+
use Enqueue\AsyncEventDispatcher\Commands;
6+
use Enqueue\AsyncEventDispatcher\Registry;
7+
use Enqueue\Client\Message;
8+
use Enqueue\Client\ProducerInterface;
9+
use Symfony\Component\EventDispatcher\Event;
10+
use Symfony\Contracts\EventDispatcher\Event as ContractEvent;
11+
12+
abstract class AbstractAsyncListener extends \Enqueue\AsyncEventDispatcher\AsyncListener
13+
{
14+
/**
15+
* @var ProducerInterface
16+
*/
17+
protected $producer;
18+
19+
/**
20+
* @var Registry
21+
*/
22+
protected $registry;
23+
24+
/**
25+
* @param ProducerInterface $producer
26+
* @param Registry $registry
27+
*/
28+
public function __construct(ProducerInterface $producer, Registry $registry)
29+
{
30+
$this->producer = $producer;
31+
$this->registry = $registry;
32+
}
33+
34+
/**
35+
* @param Event|ContractEvent $event
36+
* @param string $eventName
37+
*/
38+
protected function onEventInternal($event, $eventName)
39+
{
40+
if (false == $this->isSyncMode($eventName)) {
41+
$transformerName = $this->registry->getTransformerNameForEvent($eventName);
42+
43+
$interopMessage = $this->registry->getTransformer($transformerName)->toMessage($eventName, $event);
44+
$message = new Message($interopMessage->getBody());
45+
$message->setScope(Message::SCOPE_APP);
46+
$message->setProperty('event_name', $eventName);
47+
$message->setProperty('transformer_name', $transformerName);
48+
49+
$this->producer->sendCommand(Commands::DISPATCH_ASYNC_EVENTS, $message);
50+
}
51+
}
52+
}

Tests/Functional/App/AsyncListener.php

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,50 +2,53 @@
22

33
namespace Enqueue\Bundle\Tests\Functional\App;
44

5-
use Enqueue\AsyncEventDispatcher\Commands;
6-
use Enqueue\AsyncEventDispatcher\Registry;
7-
use Enqueue\Client\Message;
8-
use Enqueue\Client\ProducerInterface;
95
use Symfony\Component\EventDispatcher\Event;
6+
use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
7+
use Symfony\Contracts\EventDispatcher\Event as ContractEvent;
108

11-
class AsyncListener extends \Enqueue\AsyncEventDispatcher\AsyncListener
12-
{
9+
if (class_exists(Event::class) && !class_exists(LegacyEventDispatcherProxy::class)) {
1310
/**
14-
* @var ProducerInterface
11+
* Symfony < 4.3
1512
*/
16-
private $producer;
17-
18-
/**
19-
* @var Registry
20-
*/
21-
private $registry;
22-
13+
class AsyncListener extends AbstractAsyncListener
14+
{
15+
/**
16+
* @param Event $event
17+
* @param string $eventName
18+
*/
19+
public function onEvent(Event $event, $eventName)
20+
{
21+
$this->onEventInternal($event, $eventName);
22+
}
23+
}
24+
} elseif (class_exists(Event::class)) {
2325
/**
24-
* @param ProducerInterface $producer
25-
* @param Registry $registry
26+
* Symfony >= 4.3 and < 5.0
2627
*/
27-
public function __construct(ProducerInterface $producer, Registry $registry)
28+
class AsyncListener extends AbstractAsyncListener
2829
{
29-
$this->producer = $producer;
30-
$this->registry = $registry;
30+
/**
31+
* @param Event|ContractEvent $event
32+
* @param string $eventName
33+
*/
34+
public function onEvent($event, $eventName)
35+
{
36+
$this->onEventInternal($event, $eventName);
37+
}
3138
}
32-
39+
} else {
3340
/**
34-
* @param Event $event
35-
* @param string $eventName
41+
* Symfony >= 5.0
3642
*/
37-
public function onEvent(Event $event = null, $eventName)
43+
class AsyncListener extends AbstractAsyncListener
3844
{
39-
if (false == $this->isSyncMode($eventName)) {
40-
$transformerName = $this->registry->getTransformerNameForEvent($eventName);
41-
42-
$interopMessage = $this->registry->getTransformer($transformerName)->toMessage($eventName, $event);
43-
$message = new Message($interopMessage->getBody());
44-
$message->setScope(Message::SCOPE_APP);
45-
$message->setProperty('event_name', $eventName);
46-
$message->setProperty('transformer_name', $transformerName);
47-
48-
$this->producer->sendCommand(Commands::DISPATCH_ASYNC_EVENTS, $message);
45+
/**
46+
* @param ContractEvent $event
47+
* @param string $eventName
48+
*/
49+
public function onEvent(ContractEvent $event, $eventName)
50+
{
51+
$this->onEventInternal($event, $eventName);
4952
}
5053
}
5154
}

0 commit comments

Comments
 (0)