Skip to content

Commit 4521563

Browse files
committed
lib: add support for readable byte streams to .toWeb()
1 parent 0c6e16b commit 4521563

File tree

6 files changed

+135
-9
lines changed

6 files changed

+135
-9
lines changed

doc/api/stream.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3181,6 +3181,8 @@ changes:
31813181
If no value is provided, the size will be `1` for all the chunks.
31823182
* `chunk` {any}
31833183
* Returns: {number}
3184+
* `type` {string} Must be 'bytes' or undefined.
3185+
If `type` is set to 'bytes', the `strategy` option is ignored
31843186
* Returns: {ReadableStream}
31853187

31863188
### `stream.Writable.fromWeb(writableStream[, options])`
@@ -3348,7 +3350,7 @@ duplex.write('hello');
33483350
duplex.once('readable', () => console.log('readable', duplex.read()));
33493351
```
33503352

3351-
### `stream.Duplex.toWeb(streamDuplex)`
3353+
### `stream.Duplex.toWeb(streamDuplex[, options])`
33523354

33533355
<!-- YAML
33543356
added: v17.0.0
@@ -3359,6 +3361,8 @@ changes:
33593361
-->
33603362

33613363
* `streamDuplex` {stream.Duplex}
3364+
* `options` {Object}
3365+
* `type` {string}
33623366
* Returns: {Object}
33633367
* `readable` {ReadableStream}
33643368
* `writable` {WritableStream}

lib/internal/streams/duplex.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,8 @@ Duplex.fromWeb = function(pair, options) {
191191
options);
192192
};
193193

194-
Duplex.toWeb = function(duplex) {
195-
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex);
194+
Duplex.toWeb = function(duplex, options) {
195+
return lazyWebStreams().newReadableWritablePairFromDuplex(duplex, options);
196196
};
197197

198198
let duplexify;

lib/internal/webstreams/adapters.js

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj
418418
* @param {Readable} streamReadable
419419
* @param {{
420420
* strategy : QueuingStrategy
421+
* type : string,
421422
* }} [options]
422423
* @returns {ReadableStream}
423424
*/
@@ -432,13 +433,15 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
432433
'stream.Readable',
433434
streamReadable);
434435
}
436+
const isBYOB = options?.type === 'bytes';
435437

436438
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
437439
const readable = new ReadableStream();
438440
readable.cancel();
439441
return readable;
440442
}
441443

444+
442445
const objectMode = streamReadable.readableObjectMode;
443446
const highWaterMark = streamReadable.readableHighWaterMark;
444447

@@ -491,15 +494,27 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj
491494
streamReadable.on('data', onData);
492495

493496
return new ReadableStream({
494-
start(c) { controller = c; },
497+
type: isBYOB ? 'bytes' : undefined,
498+
start(c) {
499+
controller = c;
500+
if (isBYOB) {
501+
streamReadable.once('end', () => {
502+
// close the controller
503+
controller.close();
504+
// And unlock the last BYOB read request
505+
controller.byobRequest?.respond(0);
506+
wasCanceled = true;
507+
});
508+
}
509+
},
495510

496511
pull() { streamReadable.resume(); },
497512

498513
cancel(reason) {
499514
wasCanceled = true;
500515
destroy(streamReadable, reason);
501516
},
502-
}, strategy);
517+
}, isBYOB ? undefined : strategy);
503518
}
504519

505520
/**
@@ -601,9 +616,10 @@ function newStreamReadableFromReadableStream(readableStream, options = kEmptyObj
601616

602617
/**
603618
* @param {Duplex} duplex
619+
* @param {{ type?:string}} [options]
604620
* @returns {ReadableWritablePair}
605621
*/
606-
function newReadableWritablePairFromDuplex(duplex) {
622+
function newReadableWritablePairFromDuplex(duplex, options = kEmptyObject) {
607623
// Not using the internal/streams/utils isWritableNodeStream and
608624
// isReadableNodeStream utilities here because they will return false
609625
// if the duplex was created with writable or readable options set to
@@ -615,9 +631,11 @@ function newReadableWritablePairFromDuplex(duplex) {
615631
throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex);
616632
}
617633

634+
validateObject(options, 'options');
635+
618636
if (isDestroyed(duplex)) {
619637
const writable = new WritableStream();
620-
const readable = new ReadableStream();
638+
const readable = new ReadableStream(options);
621639
writable.close();
622640
readable.cancel();
623641
return { readable, writable };
@@ -633,8 +651,8 @@ function newReadableWritablePairFromDuplex(duplex) {
633651

634652
const readable =
635653
isReadable(duplex) ?
636-
newReadableStreamFromStreamReadable(duplex) :
637-
new ReadableStream();
654+
newReadableStreamFromStreamReadable(duplex, options) :
655+
new ReadableStream(options);
638656

639657
if (!isReadable(duplex))
640658
readable.cancel();

test/parallel/test-stream-duplex.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,26 @@ process.on('exit', () => {
131131
assert.deepStrictEqual(Buffer.from(result.value), dataToRead);
132132
}));
133133
}
134+
135+
// Duplex.toWeb BYOB
136+
{
137+
const dataToRead = Buffer.from('hello');
138+
const dataToWrite = Buffer.from('world');
139+
140+
const duplex = Duplex({
141+
read() {
142+
this.push(dataToRead);
143+
this.push(null);
144+
},
145+
write: common.mustCall((chunk) => {
146+
assert.strictEqual(chunk, dataToWrite);
147+
})
148+
});
149+
150+
const { writable, readable } = Duplex.toWeb(duplex, { type: 'bytes' });
151+
writable.getWriter().write(dataToWrite);
152+
const data = new Uint8Array(dataToRead.length);
153+
readable.getReader({ mode: 'byob' }).read(data).then(common.mustCall((result) => {
154+
assert.deepStrictEqual(Buffer.from(result.value), dataToRead);
155+
}));
156+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
'use strict';
2+
const common = require('../common');
3+
if (!common.hasCrypto) { common.skip('missing crypto'); }
4+
5+
const { Readable } = require('stream');
6+
const process = require('process');
7+
const { randomBytes } = require('crypto');
8+
const assert = require('assert');
9+
10+
// Based on: https://github.com/nodejs/node/issues/46347#issuecomment-1413886707
11+
// edit: make it cross-platform as /dev/urandom is not available on Windows
12+
{
13+
let currentMemoryUsage = process.memoryUsage().arrayBuffers;
14+
15+
// We initialize a stream, but not start consuming it
16+
const randomNodeStream = new Readable({
17+
read(size) {
18+
randomBytes(size, (err, buffer) => {
19+
if (err) {
20+
// If an error occurs, emit an 'error' event
21+
this.emit('error', err);
22+
return;
23+
}
24+
25+
// Push the random bytes to the stream
26+
this.push(buffer);
27+
});
28+
}
29+
});
30+
// after 2 seconds, it'll get converted to web stream
31+
let randomWebStream;
32+
33+
// We check memory usage every second
34+
// since it's a stream, it shouldn't be higher than the chunk size
35+
const reportMemoryUsage = () => {
36+
const { arrayBuffers } = process.memoryUsage();
37+
currentMemoryUsage = arrayBuffers;
38+
39+
assert(currentMemoryUsage <= 256 * 1024 * 1024);
40+
};
41+
setInterval(reportMemoryUsage, 1000);
42+
43+
// after 1 second we use Readable.toWeb
44+
// memory usage should stay pretty much the same since it's still a stream
45+
setTimeout(() => {
46+
randomWebStream = Readable.toWeb(randomNodeStream, { type: 'bytes' });
47+
}, 1000);
48+
49+
// after 2 seconds we start consuming the stream
50+
// memory usage will grow, but the old chunks should be garbage-collected pretty quickly
51+
setTimeout(async () => {
52+
53+
const reader = randomWebStream.getReader({ mode: 'byob' });
54+
55+
let done = false;
56+
while (!done) {
57+
// Read a 16 bytes of data from the stream
58+
const result = await reader.read(new Uint8Array(16));
59+
done = result.done;
60+
// We consume the stream, but we don't do anything with the data
61+
// This is to ensure that the stream is being consumed
62+
// and that the memory usage is being reported correctly
63+
}
64+
}, 2000);
65+
66+
setTimeout(() => {
67+
// Test considered passed if we don't crash
68+
process.exit(0);
69+
}, 5000);
70+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
'use strict';
2+
require('../common');
3+
const { Readable } = require('stream');
4+
{
5+
const r = Readable.from([]);
6+
// Cancelling reader while closing should not cause uncaught exceptions
7+
r.on('close', () => reader.cancel());
8+
9+
const reader = Readable.toWeb(r, { type: 'bytes' }).getReader({ mode: 'byob' });
10+
reader.read(new Uint8Array(16));
11+
}

0 commit comments

Comments
 (0)