Skip to content

stream: unref #48007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
38 changes: 38 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const {
SafeSet,
SymbolAsyncIterator,
Symbol,
ReflectApply,
} = primordials;

module.exports = Readable;
Expand Down Expand Up @@ -1154,6 +1155,39 @@ async function* createAsyncIterator(stream, options) {
}
}

function staticUnref(stream) {
// TODO - We can probably have a better way to do this
function unref() {
return async function* unref() {
const iterator = this.iterator({ destroyOnReturn: false });

for await (const val of iterator) {
yield val;
}
}.call(this);
}

// Instead of regular operator registration
const newStream = Readable.from(ReflectApply(unref, stream, []));

// TODO - we should have better way instead of listening to those events, no?
// stream.once('close', () => {
// destroyImpl.destroyer(newStream, null);
// });
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncommenting this fixes the original stream close should close unref one test but fails the Should close original stream when unref one consume all data test because the newStream is not finished yet for some reason so it will get an abort error

stream.once('error', (e) => {
destroyImpl.destroyer(newStream, e);
});

newStream.on('error', (e) => {
if (e.name !== 'AbortError') {
destroyImpl.destroyer(stream, e);
}
});

return newStream;
}


// Making it explicit these properties are not enumerable
// because otherwise some prototype manipulation in
// userland will fail.
Expand Down Expand Up @@ -1398,6 +1432,10 @@ Readable.from = function(iterable, opts) {
return from(Readable, iterable, opts);
};

Readable.unref = function(stream) {
return staticUnref(stream);
};

let webStreamsAdapters;

// Lazy to avoid circular references
Expand Down
95 changes: 95 additions & 0 deletions test/parallel/test-stream-unref.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
'use strict';
require('../common');

const {
Readable,
} = require('stream');
const { it } = require('node:test');
const { strictEqual } = require('assert');

const { from, unref } = Readable;

const nextTick = () => new Promise((resolve) => process.nextTick(resolve));

it('unref stream error should propagate to original one', async () => {
const originalStream = from([1, 2, 3, 4, 5]);

let emittedError;
originalStream.on('error', (e) => {
emittedError = e;
});
const unrefStream = unref(originalStream);

const thrownError = new Error('test');

unrefStream.destroy(thrownError);

await nextTick();
strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, true);

// Need another tick to propagate the error
await nextTick();
strictEqual(emittedError, thrownError);
});

it('Original stream error should propagate to unref one', async () => {
const originalStream = from([1, 2, 3, 4, 5]);
const unrefStream = unref(originalStream);

let emittedError;
unrefStream.on('error', (e) => {
emittedError = e;
});

const thrownError = new Error('test');

originalStream.destroy(thrownError);

await nextTick();
strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, true);

await nextTick();
strictEqual(emittedError, thrownError);
});

it('Should not close original stream when unref one finished but not consumed all data', async () => {
const originalStream = from([1, 2, 3, 4, 5]);

const unrefStream = unref(originalStream);

// eslint-disable-next-line no-unused-vars
for await (const _ of unrefStream) {
break;
}

await nextTick();
strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, false);
});

it('Should close original stream when unref one consume all data', async () => {
const originalStream = from([1, 2, 3, 4, 5]);

const unrefStream = unref(originalStream);

// This throw an abort error
await unrefStream.toArray();

await nextTick();

strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, true);
});

// This fail
it('original stream close should close unref one', async () => {
const originalStream = from([1, 2, 3, 4, 5]);
const unrefStream = unref(originalStream);

await originalStream.toArray();

strictEqual(originalStream.destroyed, true);
strictEqual(unrefStream.destroyed, true);
});