7
7
use Interop \Queue \Destination ;
8
8
use Interop \Queue \Exception \InvalidDestinationException ;
9
9
use Interop \Queue \Exception \InvalidMessageException ;
10
- use Interop \Queue \Exception \PriorityNotSupportedException ;
11
10
use Interop \Queue \Message ;
12
11
use Interop \Queue \Producer ;
13
12
use Pheanstalk \Pheanstalk ;
@@ -19,6 +18,21 @@ class PheanstalkProducer implements Producer
19
18
*/
20
19
private $ pheanstalk ;
21
20
21
+ /**
22
+ * @var int
23
+ */
24
+ private $ deliveryDelay ;
25
+
26
+ /**
27
+ * @var int
28
+ */
29
+ private $ priority ;
30
+
31
+ /**
32
+ * @var int
33
+ */
34
+ private $ timeToLive ;
35
+
22
36
public function __construct (Pheanstalk $ pheanstalk )
23
37
{
24
38
$ this ->pheanstalk = $ pheanstalk ;
@@ -35,18 +49,14 @@ public function send(Destination $destination, Message $message): void
35
49
36
50
$ rawMessage = json_encode ($ message );
37
51
if (JSON_ERROR_NONE !== json_last_error ()) {
38
- throw new \InvalidArgumentException (sprintf (
39
- 'Could not encode value into json. Error %s and message %s ' ,
40
- json_last_error (),
41
- json_last_error_msg ()
42
- ));
52
+ throw new \InvalidArgumentException (sprintf ('Could not encode value into json. Error %s and message %s ' , json_last_error (), json_last_error_msg ()));
43
53
}
44
54
45
55
$ this ->pheanstalk ->useTube ($ destination ->getName ())->put (
46
56
$ rawMessage ,
47
- $ message -> getPriority ( ),
48
- $ message -> getDelay ( ),
49
- $ message -> getTimeToRun ( )
57
+ $ this -> resolvePriority ( $ message ),
58
+ $ this -> resolveDelay ( $ message ),
59
+ $ this -> resolveTimeToLive ( $ message )
50
60
);
51
61
}
52
62
@@ -55,49 +65,79 @@ public function send(Destination $destination, Message $message): void
55
65
*/
56
66
public function setDeliveryDelay (int $ deliveryDelay = null ): Producer
57
67
{
58
- if (null === $ deliveryDelay ) {
59
- return $ this ;
60
- }
68
+ $ this ->deliveryDelay = $ deliveryDelay ;
61
69
62
- throw new \ LogicException ( ' Not implemented ' ) ;
70
+ return $ this ;
63
71
}
64
72
65
73
public function getDeliveryDelay (): ?int
66
74
{
67
- return null ;
75
+ return $ this -> deliveryDelay ;
68
76
}
69
77
70
78
/**
71
79
* @return PheanstalkProducer
72
80
*/
73
81
public function setPriority (int $ priority = null ): Producer
74
82
{
75
- if (null === $ priority ) {
76
- return $ this ;
77
- }
83
+ $ this ->priority = $ priority ;
78
84
79
- throw PriorityNotSupportedException:: providerDoestNotSupportIt () ;
85
+ return $ this ;
80
86
}
81
87
82
88
public function getPriority (): ?int
83
89
{
84
- return null ;
90
+ return $ this -> priority ;
85
91
}
86
92
87
93
/**
88
94
* @return PheanstalkProducer
89
95
*/
90
96
public function setTimeToLive (int $ timeToLive = null ): Producer
91
97
{
92
- if (null === $ timeToLive ) {
93
- return $ this ;
94
- }
98
+ $ this ->timeToLive = $ timeToLive ;
95
99
96
- throw new \ LogicException ( ' Not implemented ' ) ;
100
+ return $ this ;
97
101
}
98
102
99
103
public function getTimeToLive (): ?int
100
104
{
101
- return null ;
105
+ return $ this ->timeToLive ;
106
+ }
107
+
108
+ private function resolvePriority (PheanstalkMessage $ message ): ?int
109
+ {
110
+ if (null === $ this ->priority ) {
111
+ return $ message ->getPriority ();
112
+ }
113
+
114
+ $ priority = $ this ->priority ;
115
+ $ this ->priority = null ;
116
+
117
+ return $ priority ;
118
+ }
119
+
120
+ private function resolveDelay (PheanstalkMessage $ message ): ?int
121
+ {
122
+ if (null === $ this ->deliveryDelay ) {
123
+ return $ message ->getDelay ();
124
+ }
125
+
126
+ $ delay = $ this ->deliveryDelay ;
127
+ $ this ->deliveryDelay = null ;
128
+
129
+ return $ delay / 1000 ;
130
+ }
131
+
132
+ private function resolveTimeToLive (PheanstalkMessage $ message ): ?int
133
+ {
134
+ if (null === $ this ->timeToLive ) {
135
+ return $ message ->getTimeToRun ();
136
+ }
137
+
138
+ $ ttl = $ this ->timeToLive ;
139
+ $ this ->timeToLive = null ;
140
+
141
+ return $ ttl / 1000 ;
102
142
}
103
143
}
0 commit comments