Skip to content

Commit c0b6143

Browse files
viktorproggersamdarkStyleCIBot
authored
Message failure handling middleware pipeline (yiisoft#64)
* Initial implementation * Apply fixes from StyleCI * General implementation * Make internal method private * Fix tests * remove redundant use * The first test * Start simple strategy tests with data provider * Create SendAgainStrategy * Upgrade PayloadFactory * Upgrade existing strategy test * Rename BehaviorRemovingStrategy * Apply fixes from StyleCI * Create queue sending test draft * More strategy tests * Divide different tests into different classes * Remove redundant tests * Apply fixes from StyleCI * Remove unused uses * Fix dispatcher creating * Create FailedJobsHandler instead of Queue::jobRetry() * Add typehint to property * Make DispatcherFactory final * Add PhpDoc with type to a property * Disallow empty default pipeline in DispatcherFactory * Apply fixes from StyleCI * Allow zero initial delay * Add integrational failure strategy test * Divide namespaces * Add dispatcher tests * Improve strategy tests * Apply fixes from StyleCI * Add BehaviorAddingStrategy * Apply fixes from StyleCI * Actualizing * Apply fixes from StyleCI * Actualizing * Default configuration * Pipeline fix * Apply fixes from StyleCI * FailureStrategyFactory fix * FailureStrategyFactory fix * Bugfixes and tests * Fix more tests * Apply fixes from StyleCI * Fix types in a test * Cosmetics * Cosmetics * Remove redundant test * One more test * Apply fixes from StyleCI * Cosmetics * Test coverage increasing and bugfixes * Docs and bugfixes * Apply fixes from StyleCI * Exception message dots * PhpDoc * Apply fixes from StyleCI * Fix PhpDoc * Apply fixes from StyleCI * Convert Fail Strategies into middlewares * Convert into middlewares, iteration 1 * Apply fixes from StyleCI * Fix tests and naming * Apply fixes from StyleCI * Bugfix * Apply fixes from StyleCI * Style fix * More tests and bugfixes Co-authored-by: StyleCI Bot <[email protected]> * Docs, configs and bugfixes * Apply fixes from StyleCI * Improve exception messages * Bugfix * Apply docs suggestions from code review Co-authored-by: Alexander Makarov <[email protected]> * Fix PhpDocs Co-authored-by: Alexander Makarov <[email protected]> Co-authored-by: StyleCI Bot <[email protected]> Co-authored-by: Alexander Makarov <[email protected]>
1 parent 376c265 commit c0b6143

35 files changed

+1860
-41
lines changed

README.md

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,8 @@ twice for a queue message. That means you can add extra functionality on message
215215
of the two classes: `PushMiddlewareDispatcher` and `ConsumeMiddlewareDispatcher` respectively.
216216

217217
You can use any of these formats to define a middleware:
218-
- A ready-to-use middleware object: `new FooMiddleware()`. It must implement either `MiddlewarePushInterface`,
219-
or `MiddlewareConsumeInterface` depending on the place you use it.
218+
- A ready-to-use middleware object: `new FooMiddleware()`. It must implement `MiddlewarePushInterface`,
219+
`MiddlewareConsumeInterface` or `MiddlewareFailureInterface` depending on the place you use it.
220220
- An array in the format of [yiisoft/definitions](https://github.com/yiisoft/definitions).
221221
**Only if you use yiisoft/definitions and yiisoft/di**.
222222
- A `callable`: `fn() => // do stuff`, `$object->foo(...)`, etc. It will be executed through the
@@ -256,7 +256,9 @@ in the `PushRequest` object. You will get a `AdapterNotConfiguredException`, if
256256
You have three places to define push middlewares:
257257

258258
1. `PushMiddlewareDispatcher`. You can pass it either to the constructor, or to the `withMiddlewares()` method, which
259-
creates a completely new dispatcher object with only those middlewares, which are passed as arguments.
259+
creates a completely new dispatcher object with only those middlewares, which are passed as arguments.
260+
If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-push` key of the
261+
`yiisoft/yii-queue` array in the `params`.
260262
2. Pass middlewares to either `Queue::withMiddlewares()` or `Queue::withMiddlewaresAdded()` methods. The difference is
261263
that the former will completely replace an existing middleware stack, while the latter will add passed middlewares to
262264
the end of the existing stack. These middlewares will be executed after the common ones, passed directly to the
@@ -269,7 +271,17 @@ along with them.
269271

270272
### Consume pipeline
271273

272-
You can set a middleware pipeline for a message when it will be consumed from a queue server. This is useful to collect metrics, modify message data, etc. In pair with a Push middleware you can deduplicate messages in the queue, calculate time from push to consume, handle errors (push to a queue again, redirect failed message to another queue, send a notification, etc.). Unless Push pipeline, you have only one place to define the middleware stack: in the `ConsumeMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method.
274+
You can set a middleware pipeline for a message when it will be consumed from a queue server. This is useful to collect metrics, modify message data, etc. In pair with a Push middleware you can deduplicate messages in the queue, calculate time from push to consume, handle errors (push to a queue again, redirect failed message to another queue, send a notification, etc.). Unless Push pipeline, you have only one place to define the middleware stack: in the `ConsumeMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-consume` key of the `yiisoft/yii-queue` array in the `params`.
275+
276+
### Error handling pipeline
277+
278+
Often when some job is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in `yiisoft/yii-queue` with Failure middleware pipeline. They are triggered each time message processing via the Consume middleware pipeline is interrupted with any `Throwable`. The key differences from the previous two pipelines:
279+
- You should set up the middleware pipeline separately for each queue channel. That means, the format should be `['channel-name' => [FooMiddleware::class]]` instead of `[FooMiddleware::class]`, like for the other two pipelines. There is also a default key, which will be used for those channels without their own one: `FailureMiddlewareDispatcher::DEFAULT_PIPELINE`.
280+
- The last middleware will throw the exception, which will come with the `FailureHandlingRequest` object. If you don't want the exception to be thrown, your middlewares should `return` a request without calling `$handler->handleFailure()`.
281+
282+
You can declare error handling middleware pipeline in the `FailureMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-fail` key of the `yiisoft/yii-queue` array in the `params`.
283+
284+
See [error handling docs](docs/guide/error-handling.md) for details.
273285

274286
## Extra
275287

config/common.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,15 @@
66
use Yiisoft\Yii\Queue\Cli\LoopInterface;
77
use Yiisoft\Yii\Queue\Cli\SignalLoop;
88
use Yiisoft\Yii\Queue\Cli\SimpleLoop;
9+
use Yiisoft\Yii\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
910
use Yiisoft\Yii\Queue\Middleware\Consume\MiddlewareFactoryConsume;
1011
use Yiisoft\Yii\Queue\Middleware\Consume\MiddlewareFactoryConsumeInterface;
12+
use Yiisoft\Yii\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
13+
use Yiisoft\Yii\Queue\Middleware\FailureHandling\MiddlewareFactoryFailure;
14+
use Yiisoft\Yii\Queue\Middleware\FailureHandling\MiddlewareFactoryFailureInterface;
1115
use Yiisoft\Yii\Queue\Middleware\Push\MiddlewareFactoryPush;
1216
use Yiisoft\Yii\Queue\Middleware\Push\MiddlewareFactoryPushInterface;
17+
use Yiisoft\Yii\Queue\Middleware\Push\PushMiddlewareDispatcher;
1318
use Yiisoft\Yii\Queue\Queue;
1419
use Yiisoft\Yii\Queue\QueueFactory;
1520
use Yiisoft\Yii\Queue\QueueFactoryInterface;
@@ -37,4 +42,14 @@
3742
QueueInterface::class => Queue::class,
3843
MiddlewareFactoryPushInterface::class => MiddlewareFactoryPush::class,
3944
MiddlewareFactoryConsumeInterface::class => MiddlewareFactoryConsume::class,
45+
MiddlewareFactoryFailureInterface::class => MiddlewareFactoryFailure::class,
46+
PushMiddlewareDispatcher::class => [
47+
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/yii-queue']['middlewares-push']],
48+
],
49+
ConsumeMiddlewareDispatcher::class => [
50+
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/yii-queue']['middlewares-consume']],
51+
],
52+
FailureMiddlewareDispatcher::class => [
53+
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/yii-queue']['middlewares-fail']],
54+
],
4055
];

config/events-console.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
use Yiisoft\Yii\Queue\Event\JobFailure;
6+
use Yiisoft\Yii\Queue\FailureStrategy\FailedJobsHandler;
7+
8+
return [
9+
JobFailure::class => [
10+
[FailedJobsHandler::class, 'handle'],
11+
],
12+
];

config/params.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,8 @@
1515
'yiisoft/yii-queue' => [
1616
'handlers' => [],
1717
'channel-definitions' => [],
18+
'middlewares-push' => [],
19+
'middlewares-consume' => [],
20+
'middlewares-fail' => [],
1821
],
1922
];

docs/guide/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ An extension for running tasks asynchronously via queues.
66

77
* [Usage basics](usage.md)
88
* [Migrating from `yii2-queue`](migrating-from-yii2-queue.md)
9-
* [Errors and retryable jobs](retryable.md)
9+
* [Errors and retryable jobs](error-handling.md)
1010
* [Workers](worker.md)
1111
* [Adapter list](adapter-list.md)

docs/guide/error-handling.md

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Error handling on message processing
2+
3+
Often when some message handling is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in `yiisoft/yii-queue` with _Failure Handling Middleware Pipeline_. It is triggered each time message processing via Consume Middleware Pipeline is interrupted with any `Throwable`.
4+
5+
## Configuration
6+
7+
Here below is configuration via `yiisoft/config`. If you don't use it - you should add middleware definition list (in the `middlewares-fail` key here) to the `FailureMiddlewareDispatcher` by your own.
8+
9+
Configuration should be passed to the `yiisoft/yii-queue.fail-strategy-pipelines` key of the `params` config to work with the `yiisoft/config`. You can define different failure handling pipelines for each queue channel. Let's see and describe an example:
10+
11+
```php
12+
'yiisoft/yii-queue' => [
13+
'middlewares-fail' => [
14+
FailureMiddlewareDispatcher::DEFAULT_PIPELINE => [
15+
[
16+
'class' => SendAgainMiddleware::class,
17+
'__construct()' => ['id' => 'default-first-resend', 'queue' => null],
18+
],
19+
static fn (QueueFactoryInterface $factory) => new SendAgainMiddleware(
20+
id: 'default-second-resend',
21+
queue: $factory->get('failed-messages'),
22+
),
23+
],
24+
25+
'failed-messages' => [
26+
[
27+
'class' => ExponentialDelayMiddleware::class,
28+
'__construct()' => [
29+
'id' => 'failed-messages',
30+
'maxAttempts' => 30,
31+
'delayInitial' => 5,
32+
'delayMaximum' => 60,
33+
'exponent' => 1.5,
34+
'queue' => null,
35+
],
36+
],
37+
],
38+
],
39+
]
40+
```
41+
42+
Keys here except `FailureMiddlewareDispatcher::DEFAULT_PIPELINE` are queue channel names, and values are lists of `FailureMiddlewareInterface` definitions. `FailureMiddlewareDispatcher::DEFAULT_PIPELINE` defines a default pipeline to apply to channels without explicitly defined failure strategy pipeline. Each middleware definition must be one of:
43+
- A ready-to-use `MiddlewareFailureInterface` object like `new FooMiddleware()`.
44+
- A valid definition for the [yiisoft/definitions](https://github.com/yiisoft/definitions). It must describe an object, implementing the `MiddlewareFailureInterface`.
45+
- A callable: `fn() => // do stuff`, `$object->foo(...)`, etc. It will be executed through the `yiisoft/injector`, so all the dependencies of your callable will be resolved. You can also define a "callable-looking" array, where object will be instantiated with a DI container: `[FooMiddleware::class, 'handle']`.
46+
- A string for your DI container to resolve the middleware, e.g. `FooMiddleware::class`.
47+
48+
In the example above failures will be handled this way (look the concrete middleware description below):
49+
50+
1. For the first time message will be resent to the same queue channel immediately.
51+
2. If it fails again, it will be resent to the queue channel named `failed-messages`.
52+
3. From now on it will be resent to the same queue channel (`failed-messages`) up to 30 times with a delay from 5 to 60 seconds, increased 1.5 times each time the message fails again.
53+
4. If the message handler throw an exception one more (33rd) time, the exception will not be caught.
54+
55+
Failures of messages, which are initially sent to the `failed-messages` channel, will only be handled by the 3rd and the 4th points of this list.
56+
57+
## Default failure handling strategies
58+
59+
Let's see the built-in defaults.
60+
61+
### SendAgainMiddleware
62+
63+
This strategy simply resends the given message to a queue. Let's see the constructor parameters through which it's configured:
64+
65+
- `id` - A unique string. Allows to use this strategy more than once for the same message, just like in example above.
66+
- `maxAttempts` - Maximum attempts count for this strategy with the given $id before it will give up.
67+
- `queue` - The strategy will send the message to the given queue when it's not `null`. That means you can use this strategy to push a message not to the same queue channel it came from. When the `queue` parameter is set to `null`, a message will be sent to the same channel it came from.
68+
69+
### ExponentialDelayMiddleware
70+
71+
This strategy does the same thing as the `SendAgainMiddleware` with a single difference: it resends a message with an exponentially increasing delay. The delay **must** be implemented by the used `AdapterInterface` implementation.
72+
73+
It's configured via constructor parameters, too. Here they are:
74+
75+
- `id` - A unique string allows to use this strategy more than once for the same message, just like in example above.
76+
- `maxAttempts` - Maximum attempts count for this strategy with the given $id before it will give up.
77+
- `delayInitial` - The initial delay that will be applied to a message for the first time. It must be a positive float.
78+
- `delayMaximum` - The maximum delay which can be applied to a single message. Must be above the `delayInitial`.
79+
- `exponent` - Message handling delay will be muliplied by exponent each time it fails.
80+
- `queue` - The strategy will send the message to the given queue when it's not `null`. That means you can use this strategy to push a message not to the same queue channel it came from. When the `queue` parameter is set to `null`, a message will be sent to the same channel it came from.
81+
82+
## How to create a custom Failure Middleware?
83+
84+
All you need is to implement the `MiddlewareFailureInterface` and add your implementation definition to the [configuration](#configuration).
85+
This interface has the only method `handle`. And the method has these parameters:
86+
- `ConsumeRequest $request` - a request for a message handling. It consists of a message and a queue the message came from.
87+
- `Throwable $exception` - an exception thrown on the `request` handling
88+
- `MessageFailureHandlerInterface $handler` - failure strategy pipeline continuation. Your Middleware should call `$pipeline->handle()` when it shouldn't interrupt failure pipeline execution.
89+
90+
> Note: your strategy have to check by its own if it should be applied. Look into [`SendAgainMiddleware::suites()`](../../src/Middleware/Implementation/FailureMiddleware/Middleware/SendAgainMiddleware.php#L52) for an example.

docs/guide/retryable.md

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/Message/Message.php

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,25 @@
66

77
final class Message implements MessageInterface
88
{
9-
private ?string $id = null;
10-
9+
/**
10+
* @param string $handlerName
11+
* @param mixed $data Message data, encodable by a queue adapter
12+
* @param array $metadata Message metadata, encodable by a queue adapter
13+
* @param string|null $id Message id
14+
*/
1115
public function __construct(
1216
private string $handlerName,
1317
private mixed $data,
18+
private array $metadata = [],
19+
private ?string $id = null
1420
) {
15-
$this->handlerName = $handlerName;
16-
$this->data = $data;
1721
}
1822

1923
public function getHandlerName(): string
2024
{
2125
return $this->handlerName;
2226
}
2327

24-
/**
25-
* @return mixed
26-
*/
2728
public function getData(): mixed
2829
{
2930
return $this->data;
@@ -38,4 +39,9 @@ public function getId(): ?string
3839
{
3940
return $this->id;
4041
}
42+
43+
public function getMetadata(): array
44+
{
45+
return $this->metadata;
46+
}
4147
}

src/Message/MessageInterface.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,12 @@ public function getHandlerName(): string;
2727
*
2828
* @return mixed
2929
*/
30-
public function getData();
30+
public function getData(): mixed;
31+
32+
/**
33+
* Returns message metadata: timings, attempts count, metrics, etc.
34+
*
35+
* @return array
36+
*/
37+
public function getMetadata(): array;
3138
}

src/Middleware/Consume/ConsumeHandler.php renamed to src/Middleware/Consume/ConsumeFinalHandler.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
/**
1010
* @internal
1111
*/
12-
final class ConsumeHandler implements MessageHandlerConsumeInterface
12+
final class ConsumeFinalHandler implements MessageHandlerConsumeInterface
1313
{
1414
public function __construct(private Closure $handler)
1515
{

0 commit comments

Comments
 (0)