diff --git a/docs/docs/api/Dispatcher.md b/docs/docs/api/Dispatcher.md index 3b03d36d8e7..f5027721534 100644 --- a/docs/docs/api/Dispatcher.md +++ b/docs/docs/api/Dispatcher.md @@ -1274,6 +1274,68 @@ All deduplicated requests receive the complete response including status code, h For observability, request deduplication events are published to the `undici:request:pending-requests` [diagnostic channel](/docs/docs/api/DiagnosticsChannel.md#undicirequestpending-requests). +##### `Priority Interceptor` + +The `priority` interceptor allows you to control the order in which requests are dispatched based on an assigned priority level. Requests with higher priority are dispatched first. Requests are queued per origin and dispatched according to a configurable concurrency limit. + +**Options** + +- `concurrency` - The maximum number of concurrent requests per origin. Default: `1`. +- `maxQueue` - The maximum number of queued requests per origin. If the queue is full, an error is thrown. Default: `128`. + +**Priority Levels** + +The following named priority values are available via `interceptors.priority.PRIORITIES`: + +| Name | Value | +|---|---| +| `HIGHEST` | `4` | +| `HIGH` | `3` | +| `MEDIUM` | `2` | +| `LOW` | `1` | +| `LOWEST` | `0` | + +Requests without a `priority` option (or with `priority: null`/`undefined`) bypass the queue entirely and are dispatched immediately. + +**Example - Basic Priority Interceptor** + +```js +const { Client, interceptors } = require("undici"); +const { priority } = interceptors; +const { PRIORITIES } = priority; + +const client = new Client("http://service.example").compose( + priority({ concurrency: 2 }) +); + +// High priority request dispatches before lower ones +await client.request({ + origin: "http://service.example", + method: "GET", + path: "/important", + priority: PRIORITIES.HIGH, +}); + +// Low priority request queued behind higher priority ones +await client.request({ + origin: "http://service.example", + method: "GET", + path: "/background", + priority: PRIORITIES.LOW, +}); +``` + +**Example - Queue Size Limit** + +```js +const { Client, interceptors } = require("undici"); +const { priority } = interceptors; + +const client = new Client("http://service.example").compose( + priority({ concurrency: 1, maxQueue: 50 }) +); +``` + ## Instance Events ### Event: `'connect'` diff --git a/index.js b/index.js index 708a8ee80c5..18b21c9e297 100644 --- a/index.js +++ b/index.js @@ -52,7 +52,8 @@ module.exports.interceptors = { dns: require('./lib/interceptor/dns'), cache: require('./lib/interceptor/cache'), decompress: require('./lib/interceptor/decompress'), - deduplicate: require('./lib/interceptor/deduplicate') + deduplicate: require('./lib/interceptor/deduplicate'), + priority: require('./lib/interceptor/priority') } module.exports.cacheStores = { diff --git a/lib/interceptor/priority.js b/lib/interceptor/priority.js new file mode 100644 index 00000000000..9f03118a477 --- /dev/null +++ b/lib/interceptor/priority.js @@ -0,0 +1,113 @@ +'use strict' + +const DecoratorHandler = require('../handler/decorator-handler') + +const PRIORITIES = { + HIGHEST: 4, + HIGH: 3, + MEDIUM: 2, + LOW: 1, + LOWEST: 0 +} + +const VALID_PRIORITIES = new Set(Object.values(PRIORITIES)) + +class PriorityQueue { + #queue = [] + #concurrency + #maxQueue + #running = 0 + + constructor (concurrency = 1, maxQueue = 128) { + this.#concurrency = concurrency + this.#maxQueue = maxQueue + } + + get length () { + return this.#queue.length + } + + acquire (callback, priority = 0) { + if (this.#queue.length >= this.#maxQueue) { + throw new Error(`Priority queue is full (max ${this.#maxQueue})`) + } + this.#queue.push({ callback, priority }) + this.#queue.sort((a, b) => b.priority - a.priority) + this.#dispatch() + } + + release () { + this.#running-- + this.#dispatch() + } + + #dispatch () { + while (this.#running < this.#concurrency && this.#queue.length > 0) { + const entry = this.#queue.shift() + this.#running++ + entry.callback() + } + } +} + +class PriorityHandler extends DecoratorHandler { + #priorityQueue + + constructor (handler, priorityQueue) { + super(handler) + this.#priorityQueue = priorityQueue + } + + onResponseEnd (controller, trailers) { + this.#release() + return super.onResponseEnd(controller, trailers) + } + + onResponseError (controller, err) { + this.#release() + return super.onResponseError(controller, err) + } + + #release () { + if (this.#priorityQueue) { + const priorityQueue = this.#priorityQueue + this.#priorityQueue = null + priorityQueue.release() + } + } +} + +function createPriorityInterceptor ({ concurrency, maxQueue } = { concurrency: 1, maxQueue: 128 }) { + return (dispatch) => { + const queues = new Map() + + return function priorityInterceptor (opts, handler) { + if (opts.priority == null || !opts.origin) { + return dispatch(opts, handler) + } + + if (!VALID_PRIORITIES.has(opts.priority)) { + throw new Error(`Invalid priority ${opts.priority}. Must be one of: ${Object.keys(PRIORITIES).join(', ')} (${Object.values(PRIORITIES).join(', ')})`) + } + + let queue = queues.get(opts.origin) + if (!queue) { + queue = new PriorityQueue(concurrency, maxQueue) + queues.set(opts.origin, queue) + } + + queue.acquire(() => { + const priorityHandler = new PriorityHandler(handler, queue) + try { + dispatch(opts, priorityHandler) + } catch (err) { + priorityHandler.onResponseError(null, err) + } + }, opts.priority) + } + } +} + +createPriorityInterceptor.PRIORITIES = PRIORITIES + +module.exports = createPriorityInterceptor diff --git a/test/interceptors/priority.js b/test/interceptors/priority.js new file mode 100644 index 00000000000..d160df1d63c --- /dev/null +++ b/test/interceptors/priority.js @@ -0,0 +1,197 @@ +'use strict' + +const { createServer } = require('node:http') +const { describe, test, after } = require('node:test') +const { once } = require('node:events') +const { strictEqual, deepStrictEqual } = require('node:assert') +const { setTimeout: sleep } = require('node:timers/promises') +const { Client, interceptors } = require('../../index') + +const { PRIORITIES } = interceptors.priority + +describe('Priority Interceptor', () => { + test('dispatches requests without priority normally', async () => { + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.end('ok') + }).listen(0) + + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.priority()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const res = await client.request({ + origin: `http://localhost:${server.address().port}`, + method: 'GET', + path: '/' + }) + + const body = await res.body.text() + strictEqual(res.statusCode, 200) + strictEqual(body, 'ok') + }) + + test('dispatches requests with priority', async () => { + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.end('ok') + }).listen(0) + + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.priority()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const res = await client.request({ + origin: `http://localhost:${server.address().port}`, + method: 'GET', + path: '/', + priority: PRIORITIES.LOW + }) + + const body = await res.body.text() + strictEqual(res.statusCode, 200) + strictEqual(body, 'ok') + }) + + test('higher priority requests are dispatched first', async () => { + const order = [] + const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => { + await sleep(50) + order.push(req.url) + res.end(req.url) + }).listen(0) + + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.priority({ concurrency: 1 })) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const origin = `http://localhost:${server.address().port}` + + // Send requests with different priorities + // With concurrency 1, the first request dispatches immediately. + // The remaining requests queue by priority (higher = first). + const results = await Promise.all([ + client.request({ origin, method: 'GET', path: '/first', priority: PRIORITIES.LOW }), + client.request({ origin, method: 'GET', path: '/high', priority: PRIORITIES.HIGHEST }), + client.request({ origin, method: 'GET', path: '/low', priority: PRIORITIES.LOWEST }), + client.request({ origin, method: 'GET', path: '/medium', priority: PRIORITIES.MEDIUM }) + ]) + + // Read all bodies to ensure completion + for (const res of results) { + await res.body.text() + } + + // The first request dispatched immediately, then high, medium, low + deepStrictEqual(order, ['/first', '/high', '/medium', '/low']) + }) + + test('requests without priority bypass the queue', async () => { + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.end('ok') + }).listen(0) + + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.priority()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const origin = `http://localhost:${server.address().port}` + + // Request without priority should go through directly + const res = await client.request({ + origin, + method: 'GET', + path: '/' + }) + + const body = await res.body.text() + strictEqual(res.statusCode, 200) + strictEqual(body, 'ok') + }) + + test('rejects invalid priority values', async () => { + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.end('ok') + }).listen(0) + + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.priority()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const origin = `http://localhost:${server.address().port}` + + await client.request({ + origin, + method: 'GET', + path: '/', + priority: 99 + }).then(() => { + throw new Error('should have thrown') + }).catch((err) => { + strictEqual(err.message.includes('Invalid priority'), true) + }) + }) + + test('handles request errors gracefully', async () => { + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.destroy() + }).listen(0) + + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.priority()) + + after(async () => { + await client.close() + server.close() + await once(server, 'close') + }) + + const origin = `http://localhost:${server.address().port}` + + await client.request({ + origin, + method: 'GET', + path: '/', + priority: 1 + }).then(() => { + throw new Error('should have thrown') + }).catch((err) => { + strictEqual(err.code, 'UND_ERR_SOCKET') + }) + }) +}) diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index 9f0d5d55268..2685bb55416 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -127,6 +127,8 @@ declare namespace Dispatcher { throwOnError?: boolean; /** For H2, it appends the expect: 100-continue header, and halts the request body until a 100-continue is received from the remote server */ expectContinue?: boolean; + /** Priority of the request when used with the priority interceptor. Must be one of: HIGHEST (4), HIGH (3), MEDIUM (2), LOW (1), LOWEST (0). */ + priority?: 0 | 1 | 2 | 3 | 4; } export interface ConnectOptions { origin: string | URL; diff --git a/types/interceptors.d.ts b/types/interceptors.d.ts index 1dfd04f2a9b..bfaa4c706b7 100644 --- a/types/interceptors.d.ts +++ b/types/interceptors.d.ts @@ -62,6 +62,28 @@ declare namespace Interceptors { excludeHeaderNames?: string[] } + // Priority interceptor + export type Priority = 0 | 1 | 2 | 3 | 4 + export type PriorityInterceptorOpts = { + /** + * Maximum number of concurrent requests per origin. + * @default 1 + */ + concurrency?: number + /** + * Maximum number of queued requests per origin. + * @default 128 + */ + maxQueue?: number + } + export const PRIORITIES: { + HIGHEST: 4 + HIGH: 3 + MEDIUM: 2 + LOW: 1 + LOWEST: 0 + } + export function dump (opts?: DumpInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export function retry (opts?: RetryInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export function redirect (opts?: RedirectInterceptorOpts): Dispatcher.DispatcherComposeInterceptor @@ -70,4 +92,5 @@ declare namespace Interceptors { export function dns (opts?: DNSInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export function cache (opts?: CacheInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export function deduplicate (opts?: DeduplicateInterceptorOpts): Dispatcher.DispatcherComposeInterceptor + export function priority (opts?: PriorityInterceptorOpts): Dispatcher.DispatcherComposeInterceptor }