Skip to content

Commit af293f3

Browse files
authored
Allow attaching multiple WebSocket proxies (#258)
1 parent 81a2a2d commit af293f3

File tree

3 files changed

+131
-41
lines changed

3 files changed

+131
-41
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ An array that contains the types of the methods. Default: `['DELETE', 'GET', 'HE
160160
This module has _partial_ support for forwarding websockets by passing a
161161
`websocket` option. All those options are going to be forwarded to
162162
[`@fastify/websocket`](https://github.com/fastify/fastify-websocket).
163+
164+
Multiple websocket proxies may be attached to the same HTTP server at different paths.
165+
In this case, only the first `wsServerOptions` is applied.
166+
163167
A few things are missing:
164168

165169
1. forwarding headers as well as `rewriteHeaders`. Note: Only cookie headers are being forwarded

index.js

Lines changed: 81 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -58,37 +58,76 @@ function proxyWebSockets (source, target) {
5858
target.on('unexpected-response', () => close(1011, 'unexpected response'))
5959
}
6060

61-
function setupWebSocketProxy (fastify, options, rewritePrefix) {
62-
const server = new WebSocket.Server({
63-
server: fastify.server,
64-
...options.wsServerOptions
65-
})
66-
67-
fastify.addHook('onClose', (instance, done) => server.close(done))
68-
69-
// To be able to close the HTTP server,
70-
// all WebSocket clients need to be disconnected.
71-
// Fastify is missing a pre-close event, or the ability to
72-
// add a hook before the server.close call. We need to resort
73-
// to monkeypatching for now.
74-
const oldClose = fastify.server.close
75-
fastify.server.close = function (done) {
76-
for (const client of server.clients) {
77-
client.close()
61+
class WebSocketProxy {
62+
constructor (fastify, wsServerOptions) {
63+
this.logger = fastify.log
64+
65+
const wss = new WebSocket.Server({
66+
server: fastify.server,
67+
...wsServerOptions
68+
})
69+
70+
// To be able to close the HTTP server,
71+
// all WebSocket clients need to be disconnected.
72+
// Fastify is missing a pre-close event, or the ability to
73+
// add a hook before the server.close call. We need to resort
74+
// to monkeypatching for now.
75+
const oldClose = fastify.server.close
76+
fastify.server.close = function (done) {
77+
for (const client of wss.clients) {
78+
client.close()
79+
}
80+
oldClose.call(this, done)
7881
}
79-
oldClose.call(this, done)
82+
83+
wss.on('error', (err) => {
84+
this.logger.error(err)
85+
})
86+
87+
wss.on('connection', this.handleConnection.bind(this))
88+
89+
this.wss = wss
90+
this.prefixList = []
8091
}
8192

82-
server.on('error', (err) => {
83-
fastify.log.error(err)
84-
})
93+
close (done) {
94+
this.wss.close(done)
95+
}
96+
97+
addUpstream (prefix, rewritePrefix, upstream, wsClientOptions) {
98+
this.prefixList.push({
99+
prefix: new URL(prefix, 'ws://127.0.0.1').pathname,
100+
rewritePrefix,
101+
upstream: convertUrlToWebSocket(upstream),
102+
wsClientOptions
103+
})
104+
105+
// sort by decreasing prefix length, so that findUpstreamUrl() does longest prefix match
106+
this.prefixList.sort((a, b) => b.prefix.length - a.prefix.length)
107+
}
85108

86-
server.on('connection', (source, request) => {
87-
if (fastify.prefix && !request.url.startsWith(fastify.prefix)) {
88-
fastify.log.debug({ url: request.url }, 'not matching prefix')
109+
findUpstream (request) {
110+
const source = new URL(request.url, 'ws://127.0.0.1')
111+
112+
for (const { prefix, rewritePrefix, upstream, wsClientOptions } of this.prefixList) {
113+
if (source.pathname.startsWith(prefix)) {
114+
const target = new URL(source.pathname.replace(prefix, rewritePrefix), upstream)
115+
target.search = source.search
116+
return { target, wsClientOptions }
117+
}
118+
}
119+
120+
return undefined
121+
}
122+
123+
handleConnection (source, request) {
124+
const upstream = this.findUpstream(request)
125+
if (!upstream) {
126+
this.logger.debug({ url: request.url }, 'not matching prefix')
89127
source.close()
90128
return
91129
}
130+
const { target: url, wsClientOptions } = upstream
92131

93132
const subprotocols = []
94133
if (source.protocol) {
@@ -98,31 +137,32 @@ function setupWebSocketProxy (fastify, options, rewritePrefix) {
98137
let optionsWs = {}
99138
if (request.headers.cookie) {
100139
const headers = { cookie: request.headers.cookie }
101-
optionsWs = { ...options.wsClientOptions, headers }
140+
optionsWs = { ...wsClientOptions, headers }
102141
} else {
103-
optionsWs = options.wsClientOptions
142+
optionsWs = wsClientOptions
104143
}
105144

106-
const url = createWebSocketUrl(request)
107-
108145
const target = new WebSocket(url, subprotocols, optionsWs)
109-
110-
fastify.log.debug({ url: url.href }, 'proxy websocket')
146+
this.logger.debug({ url: url.href }, 'proxy websocket')
111147
proxyWebSockets(source, target)
112-
})
113-
114-
function createWebSocketUrl (request) {
115-
const source = new URL(request.url, 'ws://127.0.0.1')
116-
117-
const target = new URL(
118-
source.pathname.replace(fastify.prefix, rewritePrefix),
119-
convertUrlToWebSocket(options.upstream)
120-
)
148+
}
149+
}
121150

122-
target.search = source.search
151+
const httpWss = new WeakMap() // http.Server => WebSocketProxy
123152

124-
return target
153+
function setupWebSocketProxy (fastify, options, rewritePrefix) {
154+
let wsProxy = httpWss.get(fastify.server)
155+
if (!wsProxy) {
156+
wsProxy = new WebSocketProxy(fastify, options.wsServerOptions)
157+
httpWss.set(fastify.server, wsProxy)
158+
159+
fastify.addHook('onClose', (instance, done) => {
160+
httpWss.delete(fastify.server)
161+
wsProxy.close(done)
162+
})
125163
}
164+
165+
wsProxy.addUpstream(fastify.prefix, rewritePrefix, options.upstream, options.wsClientOptions)
126166
}
127167

128168
function generateRewritePrefix (prefix = '', opts) {

test/websocket.js

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,52 @@ test('basic websocket proxy', async (t) => {
5757
])
5858
})
5959

60+
test('multiple websocket upstreams', async (t) => {
61+
t.plan(8)
62+
63+
const server = Fastify()
64+
65+
for (const name of ['/A', '/A/B', '/C/D', '/C']) {
66+
const origin = createServer()
67+
const wss = new WebSocket.Server({ server: origin })
68+
t.teardown(wss.close.bind(wss))
69+
t.teardown(origin.close.bind(origin))
70+
71+
wss.once('connection', (ws) => {
72+
ws.once('message', message => {
73+
t.equal(message.toString(), `hello ${name}`)
74+
// echo
75+
ws.send(message)
76+
})
77+
})
78+
79+
await promisify(origin.listen.bind(origin))({ port: 0 })
80+
server.register(proxy, {
81+
prefix: name,
82+
upstream: `ws://localhost:${origin.address().port}`,
83+
websocket: true
84+
})
85+
}
86+
87+
await server.listen({ port: 0 })
88+
t.teardown(server.close.bind(server))
89+
90+
const wsClients = []
91+
for (const name of ['/A', '/A/B', '/C/D', '/C']) {
92+
const ws = new WebSocket(`ws://localhost:${server.server.address().port}${name}`)
93+
await once(ws, 'open')
94+
ws.send(`hello ${name}`)
95+
const [reply] = await once(ws, 'message')
96+
t.equal(reply.toString(), `hello ${name}`)
97+
wsClients.push(ws)
98+
}
99+
100+
await Promise.all([
101+
...wsClients.map(ws => once(ws, 'close')),
102+
server.close()
103+
])
104+
})
105+
60106
test('captures errors on start', async (t) => {
61107
const app = Fastify()
62108
await app.listen({ port: 0 })

0 commit comments

Comments
 (0)