Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions docs/docs/api/Dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,68 @@ All deduplicated requests receive the complete response including status code, h

For observability, request deduplication events are published to the `undici:request:pending-requests` [diagnostic channel](/docs/docs/api/DiagnosticsChannel.md#undicirequestpending-requests).

##### `Priority Interceptor`

The `priority` interceptor allows you to control the order in which requests are dispatched based on an assigned priority level. Requests with higher priority are dispatched first. Requests are queued per origin and dispatched according to a configurable concurrency limit.

**Options**

- `concurrency` - The maximum number of concurrent requests per origin. Default: `1`.
- `maxQueue` - The maximum number of queued requests per origin. If the queue is full, an error is thrown. Default: `128`.

**Priority Levels**

The following named priority values are available via `interceptors.priority.PRIORITIES`:

| Name | Value |
|---|---|
| `HIGHEST` | `4` |
| `HIGH` | `3` |
| `MEDIUM` | `2` |
| `LOW` | `1` |
| `LOWEST` | `0` |

Requests without a `priority` option (or with `priority: null`/`undefined`) bypass the queue entirely and are dispatched immediately.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit hesitant if we should allow that, as these request can be seen as top-priority kind of disregarding the purpose of the queue.

I can imagine that a 2 could be a good default if null or undefined. wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concur, there should be a default priority.


**Example - Basic Priority Interceptor**

```js
const { Client, interceptors } = require("undici");
const { priority } = interceptors;
const { PRIORITIES } = priority;

const client = new Client("http://service.example").compose(
priority({ concurrency: 2 })
);

// High priority request dispatches before lower ones
await client.request({
origin: "http://service.example",
method: "GET",
path: "/important",
priority: PRIORITIES.HIGH,
});

// Low priority request queued behind higher priority ones
await client.request({
origin: "http://service.example",
method: "GET",
path: "/background",
priority: PRIORITIES.LOW,
});
```

**Example - Queue Size Limit**

```js
const { Client, interceptors } = require("undici");
const { priority } = interceptors;

const client = new Client("http://service.example").compose(
priority({ concurrency: 1, maxQueue: 50 })
);
```

## Instance Events

### Event: `'connect'`
Expand Down
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ module.exports.interceptors = {
dns: require('./lib/interceptor/dns'),
cache: require('./lib/interceptor/cache'),
decompress: require('./lib/interceptor/decompress'),
deduplicate: require('./lib/interceptor/deduplicate')
deduplicate: require('./lib/interceptor/deduplicate'),
priority: require('./lib/interceptor/priority')
}

module.exports.cacheStores = {
Expand Down
113 changes: 113 additions & 0 deletions lib/interceptor/priority.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
'use strict'

const DecoratorHandler = require('../handler/decorator-handler')

const PRIORITIES = {
HIGHEST: 4,
HIGH: 3,
MEDIUM: 2,
LOW: 1,
LOWEST: 0
}

const VALID_PRIORITIES = new Set(Object.values(PRIORITIES))

class PriorityQueue {
#queue = []
#concurrency
#maxQueue
#running = 0

constructor (concurrency = 1, maxQueue = 128) {
this.#concurrency = concurrency
this.#maxQueue = maxQueue
}

get length () {
return this.#queue.length
}

acquire (callback, priority = 0) {
if (this.#queue.length >= this.#maxQueue) {
throw new Error(`Priority queue is full (max ${this.#maxQueue})`)
}
this.#queue.push({ callback, priority })
this.#queue.sort((a, b) => b.priority - a.priority)
this.#dispatch()
}

release () {
this.#running--
this.#dispatch()
}

#dispatch () {
while (this.#running < this.#concurrency && this.#queue.length > 0) {
const entry = this.#queue.shift()
this.#running++
entry.callback()
}
}
}

class PriorityHandler extends DecoratorHandler {
#priorityQueue

constructor (handler, priorityQueue) {
super(handler)
this.#priorityQueue = priorityQueue
}

onResponseEnd (controller, trailers) {
this.#release()
return super.onResponseEnd(controller, trailers)
}

onResponseError (controller, err) {
this.#release()
return super.onResponseError(controller, err)
}

#release () {
if (this.#priorityQueue) {
const priorityQueue = this.#priorityQueue
this.#priorityQueue = null
priorityQueue.release()
}
}
}

function createPriorityInterceptor ({ concurrency, maxQueue } = { concurrency: 1, maxQueue: 128 }) {
return (dispatch) => {
const queues = new Map()

return function priorityInterceptor (opts, handler) {
if (opts.priority == null || !opts.origin) {
return dispatch(opts, handler)
}

if (!VALID_PRIORITIES.has(opts.priority)) {
throw new Error(`Invalid priority ${opts.priority}. Must be one of: ${Object.keys(PRIORITIES).join(', ')} (${Object.values(PRIORITIES).join(', ')})`)
}

let queue = queues.get(opts.origin)
if (!queue) {
queue = new PriorityQueue(concurrency, maxQueue)
queues.set(opts.origin, queue)
}

queue.acquire(() => {
const priorityHandler = new PriorityHandler(handler, queue)
try {
dispatch(opts, priorityHandler)
} catch (err) {
priorityHandler.onResponseError(null, err)
}
}, opts.priority)
}
}
}

createPriorityInterceptor.PRIORITIES = PRIORITIES

module.exports = createPriorityInterceptor
197 changes: 197 additions & 0 deletions test/interceptors/priority.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
'use strict'

const { createServer } = require('node:http')
const { describe, test, after } = require('node:test')
const { once } = require('node:events')
const { strictEqual, deepStrictEqual } = require('node:assert')
const { setTimeout: sleep } = require('node:timers/promises')
const { Client, interceptors } = require('../../index')

const { PRIORITIES } = interceptors.priority

describe('Priority Interceptor', () => {
test('dispatches requests without priority normally', async () => {
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.end('ok')
}).listen(0)

await once(server, 'listening')

const client = new Client(`http://localhost:${server.address().port}`)
.compose(interceptors.priority())

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const res = await client.request({
origin: `http://localhost:${server.address().port}`,
method: 'GET',
path: '/'
})

const body = await res.body.text()
strictEqual(res.statusCode, 200)
strictEqual(body, 'ok')
})

test('dispatches requests with priority', async () => {
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.end('ok')
}).listen(0)

await once(server, 'listening')

const client = new Client(`http://localhost:${server.address().port}`)
.compose(interceptors.priority())

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const res = await client.request({
origin: `http://localhost:${server.address().port}`,
method: 'GET',
path: '/',
priority: PRIORITIES.LOW
})

const body = await res.body.text()
strictEqual(res.statusCode, 200)
strictEqual(body, 'ok')
})

test('higher priority requests are dispatched first', async () => {
const order = []
const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => {
await sleep(50)
order.push(req.url)
res.end(req.url)
}).listen(0)

await once(server, 'listening')

const client = new Client(`http://localhost:${server.address().port}`)
.compose(interceptors.priority({ concurrency: 1 }))

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const origin = `http://localhost:${server.address().port}`

// Send requests with different priorities
// With concurrency 1, the first request dispatches immediately.
// The remaining requests queue by priority (higher = first).
const results = await Promise.all([
client.request({ origin, method: 'GET', path: '/first', priority: PRIORITIES.LOW }),
client.request({ origin, method: 'GET', path: '/high', priority: PRIORITIES.HIGHEST }),
client.request({ origin, method: 'GET', path: '/low', priority: PRIORITIES.LOWEST }),
client.request({ origin, method: 'GET', path: '/medium', priority: PRIORITIES.MEDIUM })
])

// Read all bodies to ensure completion
for (const res of results) {
await res.body.text()
}

// The first request dispatched immediately, then high, medium, low
deepStrictEqual(order, ['/first', '/high', '/medium', '/low'])
})

test('requests without priority bypass the queue', async () => {
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.end('ok')
}).listen(0)

await once(server, 'listening')

const client = new Client(`http://localhost:${server.address().port}`)
.compose(interceptors.priority())

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const origin = `http://localhost:${server.address().port}`

// Request without priority should go through directly
const res = await client.request({
origin,
method: 'GET',
path: '/'
})

const body = await res.body.text()
strictEqual(res.statusCode, 200)
strictEqual(body, 'ok')
})

test('rejects invalid priority values', async () => {
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.end('ok')
}).listen(0)

await once(server, 'listening')

const client = new Client(`http://localhost:${server.address().port}`)
.compose(interceptors.priority())

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const origin = `http://localhost:${server.address().port}`

await client.request({
origin,
method: 'GET',
path: '/',
priority: 99
}).then(() => {
throw new Error('should have thrown')
}).catch((err) => {
strictEqual(err.message.includes('Invalid priority'), true)
})
})

test('handles request errors gracefully', async () => {
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.destroy()
}).listen(0)

await once(server, 'listening')

const client = new Client(`http://localhost:${server.address().port}`)
.compose(interceptors.priority())

after(async () => {
await client.close()
server.close()
await once(server, 'close')
})

const origin = `http://localhost:${server.address().port}`

await client.request({
origin,
method: 'GET',
path: '/',
priority: 1
}).then(() => {
throw new Error('should have thrown')
}).catch((err) => {
strictEqual(err.code, 'UND_ERR_SOCKET')
})
})
})
2 changes: 2 additions & 0 deletions types/dispatcher.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ declare namespace Dispatcher {
throwOnError?: boolean;
/** For H2, it appends the expect: 100-continue header, and halts the request body until a 100-continue is received from the remote server */
expectContinue?: boolean;
/** Priority of the request when used with the priority interceptor. Must be one of: HIGHEST (4), HIGH (3), MEDIUM (2), LOW (1), LOWEST (0). */
priority?: 0 | 1 | 2 | 3 | 4;
}
export interface ConnectOptions<TOpaque = null> {
origin: string | URL;
Expand Down
Loading
Loading