Skip to content

Commit 6c115ff

Browse files
feat: websocket reconnect (#405)
* wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * feat: websocket reconnection * feat: add onReconnect hook to wsReconnect * fix: await onReconnect * wip * feat: introduce hooks on message * chore: default hooks * wip * wip * wip * add tests * add reconnection example * add params to hooks
1 parent b2f29b5 commit 6c115ff

File tree

15 files changed

+1341
-10
lines changed

15 files changed

+1341
-10
lines changed

README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,37 @@ It also supports an additional `rewriteRequestHeaders(headers, request)` functio
227227
opening the WebSocket connection. This function should return an object with the given headers.
228228
The default implementation forwards the `cookie` header.
229229

230+
## `wsReconnect`
231+
232+
**Experimental.** (default: `disabled`)
233+
234+
Reconnection feature detects and closes broken connections and reconnects automatically, see [how to detect and close broken connections](https://github.com/websockets/ws#how-to-detect-and-close-broken-connections).
235+
The connection is considered broken if the target does not respond to the ping messages or no data is received from the target.
236+
237+
The `wsReconnect` option contains the configuration for the WebSocket reconnection feature.
238+
To enable the feature, set the `wsReconnect` option to an object with the following properties:
239+
240+
- `pingInterval`: The interval between ping messages in ms (default: `30_000`).
241+
- `maxReconnectionRetries`: The maximum number of reconnection retries (`1` to `Infinity`, default: `Infinity`).
242+
- `reconnectInterval`: The interval between reconnection attempts in ms (default: `1_000`).
243+
- `reconnectDecay`: The decay factor for the reconnection interval (default: `1.5`).
244+
- `connectionTimeout`: The timeout for establishing the connection in ms (default: `5_000`).
245+
- `reconnectOnClose`: Whether to reconnect on close, as long as the connection from the related client to the proxy is active (default: `false`).
246+
- `logs`: Whether to log the reconnection process (default: `false`).
247+
248+
See the example in [examples/reconnection](examples/reconnection).
249+
250+
## wsHooks
251+
252+
On websocket events, the following hooks are available, note **the hooks are all synchronous**.
253+
254+
- `onIncomingMessage`: A hook function that is called when the request is received from the client `onIncomingMessage(source, target, { data, binary })` (default: `undefined`).
255+
- `onOutgoingMessage`: A hook function that is called when the response is received from the target `onOutgoingMessage(source, target, { data, binary })` (default: `undefined`).
256+
- `onConnect`: A hook function that is called when the connection is established `onConnect(source, target)` (default: `undefined`).
257+
- `onDisconnect`: A hook function that is called when the connection is closed `onDisconnect(source)` (default: `undefined`).
258+
- `onReconnect`: A hook function that is called when the connection is reconnected `onReconnect(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled.
259+
- `onPong`: A hook function that is called when the target responds to the ping `onPong(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled.
260+
230261
## Benchmarks
231262

232263
The following benchmarks were generated on a dedicated server with an Intel(R) Core(TM) i7-7700 CPU @ 3.60GHz and 64GB of RAM:
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Reconnection Example
2+
3+
This example demonstrates how to use the reconnection feature of the proxy.
4+
5+
It simulates an unstable target service: slow to start, unresponsive due to block of the event loop, crash and restart.
6+
7+
The goal is to ensures a more resilient and customizable integration, minimizing disruptions caused by connection instability.
8+
9+
10+
## How to run
11+
12+
Run the unstable target
13+
14+
```
15+
cd examples/reconnection/unstable-target
16+
npm run unstable
17+
```
18+
19+
Run the proxy
20+
21+
```
22+
cd examples/reconnection/proxy
23+
npm run start
24+
```
25+
26+
Then run the client
27+
28+
```
29+
cd examples/reconnection/client
30+
npm run start
31+
```
32+
33+
---
34+
35+
## How it works
36+
37+
### Proxy Connection Monitoring and Recovery
38+
39+
The proxy monitors the target connection using a ping/pong mechanism. If a pong response does not arrive on time, the connection is closed, and the proxy attempts to reconnect.
40+
41+
If the target service crashes, the connection may close either gracefully or abruptly. Regardless of how the disconnection occurs, the proxy detects the connection loss and initiates a reconnection attempt.
42+
43+
### Connection Stability
44+
45+
- The connection between the client and the proxy remains unaffected by an unstable target.
46+
- The connection between the proxy and the target may be closed due to:
47+
- The target failing to respond to ping messages, even if the connection is still technically open (e.g., due to a freeze or blockage).
48+
- The target crashing and restarting.
49+
50+
### Handling Data Loss During Reconnection
51+
52+
The proxy supports hooks to manage potential data loss during reconnection. These hooks allow for custom logic to ensure message integrity when resending data from the client to the target.
53+
54+
Examples of how hooks can be used based on the target service type:
55+
56+
- GraphQL subscriptions: Resend the subscription from the last received message.
57+
- Message brokers: Resend messages starting from the last successfully processed message.
58+
59+
In this example, the proxy re-sends the messages from the last ping to ensure all the messages are sent to the target, without any additional logic.
60+
Resending messages from the last pong ensures that the target does not miss any messages, but it may send messages more than once.

examples/reconnection/client/index.js

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
'use strict'
2+
3+
const WebSocket = require('ws')
4+
5+
const port = process.env.PORT || 3001
6+
7+
// connect to proxy
8+
9+
const url = `ws://localhost:${port}/`
10+
const ws = new WebSocket(url)
11+
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8', objectMode: true })
12+
13+
client.setEncoding('utf8')
14+
15+
let i = 1
16+
setInterval(() => {
17+
client.write(JSON.stringify({
18+
message: i
19+
}))
20+
i++
21+
}, 1000).unref()
22+
const responses = {}
23+
24+
client.on('data', message => {
25+
const data = JSON.parse(message)
26+
console.log('Received', data)
27+
responses[data.response] = responses[data.response] ? responses[data.response] + 1 : 1
28+
})
29+
30+
client.on('error', error => {
31+
console.log('Error')
32+
console.error(error)
33+
})
34+
35+
client.on('close', () => {
36+
console.log('\n\n\nConnection closed')
37+
38+
console.log('\n\n\nResponses')
39+
for (const key in responses) {
40+
if (!responses[key]) {
41+
console.log('missing', key)
42+
} else if (responses[key] !== 1) {
43+
console.log('extra messages', key, responses[key])
44+
}
45+
}
46+
})
47+
48+
client.on('unexpected-response', (error) => {
49+
console.log('Unexpected response')
50+
console.error(error)
51+
})
52+
53+
client.on('redirect', (error) => {
54+
console.log('Redirect')
55+
console.error(error)
56+
})
57+
58+
client.on('upgrade', (error) => {
59+
console.log('Upgrade')
60+
console.error(error)
61+
})
62+
63+
client.on('ping', (error) => {
64+
console.log('Ping')
65+
console.error(error)
66+
})
67+
68+
client.on('pong', (error) => {
69+
console.log('Pong')
70+
console.error(error)
71+
})
72+
73+
process.on('SIGINT', () => {
74+
client.end()
75+
})
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"name": "client",
3+
"version": "1.0.0",
4+
"main": "index.js",
5+
"scripts": {
6+
"start": "node index.js",
7+
"dev": "node --watch index.js"
8+
},
9+
"dependencies": {
10+
"ws": "^8.18.0"
11+
}
12+
}

examples/reconnection/proxy/index.js

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
'use strict'
2+
3+
const { setTimeout: wait } = require('node:timers/promises')
4+
const fastify = require('fastify')
5+
const fastifyHttpProxy = require('../../../')
6+
7+
async function main () {
8+
const port = process.env.PORT || 3001
9+
10+
const wsReconnect = {
11+
logs: true,
12+
pingInterval: 3_000,
13+
reconnectOnClose: true,
14+
}
15+
16+
let backup = []
17+
let lastPong = Date.now()
18+
19+
// resend messages from last ping
20+
// it may send messages more than once
21+
// in case the target already received messages between last ping and the reconnection
22+
async function resendMessages (target) {
23+
const now = Date.now()
24+
25+
for (const m of backup) {
26+
if (m.timestamp < lastPong || m.timestamp > now) {
27+
continue
28+
}
29+
console.log(' >>> resending message #', m)
30+
target.send(m.message)
31+
// introduce a small delay to avoid to flood the target
32+
await wait(250)
33+
}
34+
};
35+
36+
const wsHooks = {
37+
onPong: () => {
38+
console.log('onPong')
39+
lastPong = Date.now()
40+
// clean backup from the last ping
41+
backup = backup.filter(message => message.timestamp > lastPong)
42+
},
43+
onIncomingMessage: (source, target, message) => {
44+
const m = message.data.toString()
45+
console.log('onIncomingMessage backup', m)
46+
backup.push({ message: m, timestamp: Date.now() })
47+
},
48+
onDisconnect: () => {
49+
console.log('onDisconnect')
50+
backup.length = 0
51+
},
52+
onReconnect: (source, target) => {
53+
console.log('onReconnect')
54+
resendMessages(target)
55+
},
56+
}
57+
58+
const proxy = fastify({ logger: true })
59+
proxy.register(fastifyHttpProxy, {
60+
upstream: 'http://localhost:3000/',
61+
websocket: true,
62+
wsUpstream: 'ws://localhost:3000/',
63+
wsReconnect,
64+
wsHooks,
65+
})
66+
67+
await proxy.listen({ port })
68+
}
69+
70+
main()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"name": "proxy",
3+
"version": "1.0.0",
4+
"main": "index.js",
5+
"scripts": {
6+
"start": "node index.js",
7+
"dev": "node --watch index.js"
8+
},
9+
"dependencies": {
10+
"fastify": "^5.2.1"
11+
}
12+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
'use strict'
2+
3+
const { setTimeout: wait } = require('node:timers/promises')
4+
const fastify = require('fastify')
5+
6+
// unstable service
7+
8+
async function main () {
9+
const SLOW_START = process.env.SLOW_START || 2_000
10+
const UNSTABLE_MIN = process.env.UNSTABLE_MIN || 1_000
11+
const UNSTABLE_MAX = process.env.UNSTABLE_MAX || 10_000
12+
const BLOCK_TIME = process.env.BLOCK_TIME || 5_000
13+
14+
const app = fastify({ logger: true })
15+
16+
// slow start
17+
18+
await wait(SLOW_START)
19+
20+
app.register(require('@fastify/websocket'))
21+
app.register(async function (app) {
22+
app.get('/', { websocket: true }, (socket) => {
23+
socket.on('message', message => {
24+
let m = message.toString()
25+
console.log('incoming message', m)
26+
m = JSON.parse(m)
27+
28+
socket.send(JSON.stringify({
29+
response: m.message
30+
}))
31+
})
32+
})
33+
})
34+
35+
try {
36+
const port = process.env.PORT || 3000
37+
await app.listen({ port })
38+
} catch (err) {
39+
app.log.error(err)
40+
process.exit(1)
41+
}
42+
43+
if (process.env.STABLE) {
44+
return
45+
}
46+
47+
function runProblem () {
48+
const problem = process.env.PROBLEM || (Math.random() < 0.5 ? 'crash' : 'block')
49+
const unstabilityTimeout = process.env.UNSTABLE_TIMEOUT || Math.round(UNSTABLE_MIN + Math.random() * (UNSTABLE_MAX - UNSTABLE_MIN))
50+
51+
if (problem === 'crash') {
52+
console.log(`Restarting (crash and restart) in ${unstabilityTimeout}ms`)
53+
setTimeout(() => {
54+
console.log('UNHANDLED EXCEPTION')
55+
throw new Error('UNHANDLED EXCEPTION')
56+
}, unstabilityTimeout).unref()
57+
} else {
58+
console.log(`Blocking EL in ${unstabilityTimeout}ms for ${BLOCK_TIME}ms`)
59+
60+
setTimeout(() => {
61+
console.log('Block EL ...')
62+
const start = performance.now()
63+
while (performance.now() - start < BLOCK_TIME) {
64+
// just block
65+
}
66+
console.log('Block ends')
67+
runProblem()
68+
}, unstabilityTimeout).unref()
69+
}
70+
}
71+
72+
runProblem()
73+
}
74+
75+
main()
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"name": "unstable-target",
3+
"version": "1.0.0",
4+
"main": "index.js",
5+
"scripts": {
6+
"stable": "STABLE=1 node index.js",
7+
"unstable": "forever index.js",
8+
"dev": "node --watch index.js"
9+
},
10+
"dependencies": {
11+
"fastify": "^5.2.1",
12+
"forever": "^4.0.3"
13+
}
14+
}

0 commit comments

Comments
 (0)