Skip to content
25 changes: 25 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,29 @@ async function* createAsyncIterator(stream, options) {
}
}

function staticUnref(stream) {
const unrefStream = new Readable({
objectMode: stream.readableObjectMode ?? stream.objectMode ?? true,
// highWaterMark 0 as unref is basically a proxy, so don't consume more data
// as we would lose it when continue consuming from the original stream
highWaterMark: 0,
Comment on lines +1171 to +1173
Copy link
Member Author

@rluvaton rluvaton May 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following test show why 0 (it's part of the tests suite also)

it('Should continue consuming the original stream data from where the unref stopped', async () => {
  const originalStream = from([1, 2, 3, 4, 5]);

  const firstItem = await unref(originalStream).take(1).toArray();
  deepStrictEqual(firstItem, [1]);

  const restOfData = await originalStream.toArray();
  deepStrictEqual(restOfData, [2, 3, 4, 5]);
});

// TODO - what about other options?
destroy(err, callback) {
// Not destroying the stream here as we unref it.
callback(err);
},
}).wrap(stream);

unrefStream.once('error', (e) => {
if (e.name !== 'AbortError') {
destroyImpl.destroyer(stream, e);
}
});

return unrefStream;
}


// Making it explicit these properties are not enumerable
// because otherwise some prototype manipulation in
// userland will fail.
Expand Down Expand Up @@ -1398,6 +1421,8 @@ Readable.from = function(iterable, opts) {
return from(Readable, iterable, opts);
};

Readable.unref = staticUnref;

let webStreamsAdapters;

// Lazy to avoid circular references
Expand Down
104 changes: 104 additions & 0 deletions test/parallel/test-stream-unref.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
'use strict';
require('../common');

const {
Readable,
} = require('stream');
const { it } = require('node:test');
const { strictEqual, deepStrictEqual } = require('assert');

const { from, unref } = Readable;

const nextTick = () => new Promise((resolve) => process.nextTick(resolve));

it('unref stream error should propagate to original one', async () => {
const originalStream = from([1, 2, 3, 4, 5]);

let emittedError;
originalStream.on('error', (e) => {
emittedError = e;
});
const unrefStream = unref(originalStream);

const thrownError = new Error('test');

unrefStream.destroy(thrownError);

await nextTick();
strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, true);

// Need another tick to propagate the error
await nextTick();
strictEqual(emittedError, thrownError);
});

it('Original stream error should propagate to unref one', async () => {
const originalStream = from([1, 2, 3, 4, 5]);
const unrefStream = unref(originalStream);

let emittedError;
unrefStream.on('error', (e) => {
emittedError = e;
});

const thrownError = new Error('test');

originalStream.destroy(thrownError);

await nextTick();
strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, true);

await nextTick();
strictEqual(emittedError, thrownError);
});

it('Should not close original stream when unref one finished but not consumed all data', async () => {
const originalStream = from([1, 2, 3, 4, 5]);

const unrefStream = unref(originalStream);

// eslint-disable-next-line no-unused-vars
for await (const _ of unrefStream) {
break;
}

await nextTick();
strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, false);
});

it('Should continue consuming the original stream data from where the unref stopped', async () => {
const originalStream = from([1, 2, 3, 4, 5]);

const firstItem = await unref(originalStream).take(1).toArray();
deepStrictEqual(firstItem, [1]);

const restOfData = await originalStream.toArray();
deepStrictEqual(restOfData, [2, 3, 4, 5]);
});

it('Should close original stream when unref one consume all data', async () => {
const originalStream = from([1, 2, 3, 4, 5]);

const unrefStream = unref(originalStream);

const data = await unrefStream.toArray();
deepStrictEqual(data, [1, 2, 3, 4, 5]);

await nextTick();

strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, true);
});

it('original stream close should close unref one', async () => {
const originalStream = from([1, 2, 3, 4, 5]);
const unrefStream = unref(originalStream);

await originalStream.toArray();

strictEqual(originalStream.destroyed, true);
strictEqual(unrefStream.destroyed, true);
});