Skip to content

Commit a7e2c71

Browse files
committed
added priotization intercepter
1 parent 07276c9 commit a7e2c71

File tree

6 files changed

+399
-1
lines changed

6 files changed

+399
-1
lines changed

docs/docs/api/Dispatcher.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,6 +1274,68 @@ All deduplicated requests receive the complete response including status code, h
12741274

12751275
For observability, request deduplication events are published to the `undici:request:pending-requests` [diagnostic channel](/docs/docs/api/DiagnosticsChannel.md#undicirequestpending-requests).
12761276

1277+
##### `Priority Interceptor`
1278+
1279+
The `priority` interceptor allows you to control the order in which requests are dispatched based on an assigned priority level. Requests with higher priority are dispatched first. Requests are queued per origin and dispatched according to a configurable concurrency limit.
1280+
1281+
**Options**
1282+
1283+
- `concurrency` - The maximum number of concurrent requests per origin. Default: `1`.
1284+
- `maxQueue` - The maximum number of queued requests per origin. If the queue is full, an error is thrown. Default: `128`.
1285+
1286+
**Priority Levels**
1287+
1288+
The following named priority values are available via `interceptors.priority.PRIORITIES`:
1289+
1290+
| Name | Value |
1291+
|---|---|
1292+
| `HIGHEST` | `4` |
1293+
| `HIGH` | `3` |
1294+
| `MEDIUM` | `2` |
1295+
| `LOW` | `1` |
1296+
| `LOWEST` | `0` |
1297+
1298+
Requests without a `priority` option (or with `priority: null`/`undefined`) bypass the queue entirely and are dispatched immediately.
1299+
1300+
**Example - Basic Priority Interceptor**
1301+
1302+
```js
1303+
const { Client, interceptors } = require("undici");
1304+
const { priority } = interceptors;
1305+
const { PRIORITIES } = priority;
1306+
1307+
const client = new Client("http://service.example").compose(
1308+
priority({ concurrency: 2 })
1309+
);
1310+
1311+
// High priority request dispatches before lower ones
1312+
await client.request({
1313+
origin: "http://service.example",
1314+
method: "GET",
1315+
path: "/important",
1316+
priority: PRIORITIES.HIGH,
1317+
});
1318+
1319+
// Low priority request queued behind higher priority ones
1320+
await client.request({
1321+
origin: "http://service.example",
1322+
method: "GET",
1323+
path: "/background",
1324+
priority: PRIORITIES.LOW,
1325+
});
1326+
```
1327+
1328+
**Example - Queue Size Limit**
1329+
1330+
```js
1331+
const { Client, interceptors } = require("undici");
1332+
const { priority } = interceptors;
1333+
1334+
const client = new Client("http://service.example").compose(
1335+
priority({ concurrency: 1, maxQueue: 50 })
1336+
);
1337+
```
1338+
12771339
## Instance Events
12781340

12791341
### Event: `'connect'`

index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ module.exports.interceptors = {
5252
dns: require('./lib/interceptor/dns'),
5353
cache: require('./lib/interceptor/cache'),
5454
decompress: require('./lib/interceptor/decompress'),
55-
deduplicate: require('./lib/interceptor/deduplicate')
55+
deduplicate: require('./lib/interceptor/deduplicate'),
56+
priority: require('./lib/interceptor/priority')
5657
}
5758

5859
module.exports.cacheStores = {

lib/interceptor/priority.js

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
'use strict'
2+
3+
const DecoratorHandler = require('../handler/decorator-handler')
4+
5+
const PRIORITIES = {
6+
HIGHEST: 4,
7+
HIGH: 3,
8+
MEDIUM: 2,
9+
LOW: 1,
10+
LOWEST: 0
11+
}
12+
13+
const VALID_PRIORITIES = new Set(Object.values(PRIORITIES))
14+
15+
class PriorityQueue {
16+
#queue = []
17+
#concurrency
18+
#maxQueue
19+
#running = 0
20+
21+
constructor (concurrency = 1, maxQueue = 128) {
22+
this.#concurrency = concurrency
23+
this.#maxQueue = maxQueue
24+
}
25+
26+
get length () {
27+
return this.#queue.length
28+
}
29+
30+
acquire (callback, priority = 0) {
31+
if (this.#queue.length >= this.#maxQueue) {
32+
throw new Error(`Priority queue is full (max ${this.#maxQueue})`)
33+
}
34+
this.#queue.push({ callback, priority })
35+
this.#queue.sort((a, b) => b.priority - a.priority)
36+
this.#dispatch()
37+
}
38+
39+
release () {
40+
this.#running--
41+
this.#dispatch()
42+
}
43+
44+
#dispatch () {
45+
while (this.#running < this.#concurrency && this.#queue.length > 0) {
46+
const entry = this.#queue.shift()
47+
this.#running++
48+
entry.callback()
49+
}
50+
}
51+
}
52+
53+
class PriorityHandler extends DecoratorHandler {
54+
#priorityQueue
55+
56+
constructor (handler, priorityQueue) {
57+
super(handler)
58+
this.#priorityQueue = priorityQueue
59+
}
60+
61+
onResponseEnd (controller, trailers) {
62+
this.#release()
63+
return super.onResponseEnd(controller, trailers)
64+
}
65+
66+
onResponseError (controller, err) {
67+
this.#release()
68+
return super.onResponseError(controller, err)
69+
}
70+
71+
#release () {
72+
if (this.#priorityQueue) {
73+
const priorityQueue = this.#priorityQueue
74+
this.#priorityQueue = null
75+
priorityQueue.release()
76+
}
77+
}
78+
}
79+
80+
function createPriorityInterceptor ({ concurrency, maxQueue } = { concurrency: 1, maxQueue: 128 }) {
81+
return (dispatch) => {
82+
const queues = new Map()
83+
84+
return function priorityInterceptor (opts, handler) {
85+
if (opts.priority == null || !opts.origin) {
86+
return dispatch(opts, handler)
87+
}
88+
89+
if (!VALID_PRIORITIES.has(opts.priority)) {
90+
throw new Error(`Invalid priority ${opts.priority}. Must be one of: ${Object.keys(PRIORITIES).join(', ')} (${Object.values(PRIORITIES).join(', ')})`)
91+
}
92+
93+
let queue = queues.get(opts.origin)
94+
if (!queue) {
95+
queue = new PriorityQueue(concurrency, maxQueue)
96+
queues.set(opts.origin, queue)
97+
}
98+
99+
queue.acquire(() => {
100+
const priorityHandler = new PriorityHandler(handler, queue)
101+
try {
102+
dispatch(opts, priorityHandler)
103+
} catch (err) {
104+
priorityHandler.onResponseError(null, err)
105+
}
106+
}, opts.priority)
107+
}
108+
}
109+
}
110+
111+
createPriorityInterceptor.PRIORITIES = PRIORITIES
112+
113+
module.exports = createPriorityInterceptor

test/interceptors/priority.js

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
'use strict'
2+
3+
const { createServer } = require('node:http')
4+
const { describe, test, after } = require('node:test')
5+
const { once } = require('node:events')
6+
const { strictEqual, deepStrictEqual, throws } = require('node:assert')
7+
const { setTimeout: sleep } = require('node:timers/promises')
8+
const { Client, interceptors } = require('../../index')
9+
10+
const { PRIORITIES } = interceptors.priority
11+
12+
describe('Priority Interceptor', () => {
13+
test('dispatches requests without priority normally', async () => {
14+
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
15+
res.end('ok')
16+
}).listen(0)
17+
18+
await once(server, 'listening')
19+
20+
const client = new Client(`http://localhost:${server.address().port}`)
21+
.compose(interceptors.priority())
22+
23+
after(async () => {
24+
await client.close()
25+
server.close()
26+
await once(server, 'close')
27+
})
28+
29+
const res = await client.request({
30+
origin: `http://localhost:${server.address().port}`,
31+
method: 'GET',
32+
path: '/'
33+
})
34+
35+
const body = await res.body.text()
36+
strictEqual(res.statusCode, 200)
37+
strictEqual(body, 'ok')
38+
})
39+
40+
test('dispatches requests with priority', async () => {
41+
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
42+
res.end('ok')
43+
}).listen(0)
44+
45+
await once(server, 'listening')
46+
47+
const client = new Client(`http://localhost:${server.address().port}`)
48+
.compose(interceptors.priority())
49+
50+
after(async () => {
51+
await client.close()
52+
server.close()
53+
await once(server, 'close')
54+
})
55+
56+
const res = await client.request({
57+
origin: `http://localhost:${server.address().port}`,
58+
method: 'GET',
59+
path: '/',
60+
priority: PRIORITIES.LOW
61+
})
62+
63+
const body = await res.body.text()
64+
strictEqual(res.statusCode, 200)
65+
strictEqual(body, 'ok')
66+
})
67+
68+
test('higher priority requests are dispatched first', async () => {
69+
const order = []
70+
const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => {
71+
await sleep(50)
72+
order.push(req.url)
73+
res.end(req.url)
74+
}).listen(0)
75+
76+
await once(server, 'listening')
77+
78+
const client = new Client(`http://localhost:${server.address().port}`)
79+
.compose(interceptors.priority({ concurrency: 1 }))
80+
81+
after(async () => {
82+
await client.close()
83+
server.close()
84+
await once(server, 'close')
85+
})
86+
87+
const origin = `http://localhost:${server.address().port}`
88+
89+
// Send requests with different priorities
90+
// With concurrency 1, the first request dispatches immediately.
91+
// The remaining requests queue by priority (higher = first).
92+
const results = await Promise.all([
93+
client.request({ origin, method: 'GET', path: '/first', priority: PRIORITIES.LOW }),
94+
client.request({ origin, method: 'GET', path: '/high', priority: PRIORITIES.HIGHEST }),
95+
client.request({ origin, method: 'GET', path: '/low', priority: PRIORITIES.LOWEST }),
96+
client.request({ origin, method: 'GET', path: '/medium', priority: PRIORITIES.MEDIUM })
97+
])
98+
99+
// Read all bodies to ensure completion
100+
for (const res of results) {
101+
await res.body.text()
102+
}
103+
104+
// The first request dispatched immediately, then high, medium, low
105+
deepStrictEqual(order, ['/first', '/high', '/medium', '/low'])
106+
})
107+
108+
test('requests without priority bypass the queue', async () => {
109+
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
110+
res.end('ok')
111+
}).listen(0)
112+
113+
await once(server, 'listening')
114+
115+
const client = new Client(`http://localhost:${server.address().port}`)
116+
.compose(interceptors.priority())
117+
118+
after(async () => {
119+
await client.close()
120+
server.close()
121+
await once(server, 'close')
122+
})
123+
124+
const origin = `http://localhost:${server.address().port}`
125+
126+
// Request without priority should go through directly
127+
const res = await client.request({
128+
origin,
129+
method: 'GET',
130+
path: '/'
131+
})
132+
133+
const body = await res.body.text()
134+
strictEqual(res.statusCode, 200)
135+
strictEqual(body, 'ok')
136+
})
137+
138+
test('rejects invalid priority values', async () => {
139+
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
140+
res.end('ok')
141+
}).listen(0)
142+
143+
await once(server, 'listening')
144+
145+
const client = new Client(`http://localhost:${server.address().port}`)
146+
.compose(interceptors.priority())
147+
148+
after(async () => {
149+
await client.close()
150+
server.close()
151+
await once(server, 'close')
152+
})
153+
154+
const origin = `http://localhost:${server.address().port}`
155+
156+
await client.request({
157+
origin,
158+
method: 'GET',
159+
path: '/',
160+
priority: 99
161+
}).then(() => {
162+
throw new Error('should have thrown')
163+
}).catch((err) => {
164+
strictEqual(err.message.includes('Invalid priority'), true)
165+
})
166+
})
167+
168+
test('handles request errors gracefully', async () => {
169+
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
170+
res.destroy()
171+
}).listen(0)
172+
173+
await once(server, 'listening')
174+
175+
const client = new Client(`http://localhost:${server.address().port}`)
176+
.compose(interceptors.priority())
177+
178+
after(async () => {
179+
await client.close()
180+
server.close()
181+
await once(server, 'close')
182+
})
183+
184+
const origin = `http://localhost:${server.address().port}`
185+
186+
await client.request({
187+
origin,
188+
method: 'GET',
189+
path: '/',
190+
priority: 1
191+
}).then(() => {
192+
throw new Error('should have thrown')
193+
}).catch((err) => {
194+
strictEqual(err.code, 'UND_ERR_SOCKET')
195+
})
196+
})
197+
})

types/dispatcher.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ declare namespace Dispatcher {
127127
throwOnError?: boolean;
128128
/** For H2, it appends the expect: 100-continue header, and halts the request body until a 100-continue is received from the remote server */
129129
expectContinue?: boolean;
130+
/** Priority of the request when used with the priority interceptor. Must be one of: HIGHEST (4), HIGH (3), MEDIUM (2), LOW (1), LOWEST (0). */
131+
priority?: 0 | 1 | 2 | 3 | 4;
130132
}
131133
export interface ConnectOptions<TOpaque = null> {
132134
origin: string | URL;

0 commit comments

Comments
 (0)