Skip to content

events: add addDisposableListener method to EventEmitter #58453

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions doc/api/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,33 @@ changes:

The `'removeListener'` event is emitted _after_ the `listener` is removed.

### `emitter.addDisposableListener(eventName, listener[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `eventName` {string|symbol} The name of the event.
* `listener` {Function} The callback function
* `options` {Object}
* `once` {boolean} If `true`, the listener will be removed after being called
once.
* Returns: {Object} An object with a dispose method that will remove the listener.
The function will also have a `Symbol.dispose` method so the function can
be used with the `using` keyword.
Comment on lines +520 to +522
Copy link
Member

@legendecas legendecas Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an anonymous object in the API makes it hard to discover, like searching API interfaces with [Symbol.dispose] method. Additionally, the properties of the anynomous objects are not as clear as a named types. I'd prefer a named interface for the event listener disposable type.


```mjs
import { EventEmitter } from 'node:events';
const myEmitter = new EventEmitter();
{
using disposer = myEmitter.addDisposableListener('event', console.log);
console.log(myEmitter.listenerCount('event')); // Prints: 1
}
console.log(myEmitter.listenerCount('event')); // Prints: 0
```

### `emitter.addListener(eventName, listener)`

<!-- YAML
Expand Down
42 changes: 42 additions & 0 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -1210,3 +1210,45 @@ function listenersController() {
},
};
}

function makeDisposer(self, type, listener) {
return {
dispose() {
if (self === undefined) return;
self.removeListener(type, listener);
self = undefined;
},
[SymbolDispose]() {
this.dispose();
},
};
}

/**
* A variation on `addListener` that returns a function that can be called
* to remove the listener. The function includes a Symbol.dispose property
* that allows the function to be used with `using` statements.
* @param {string|symbol} type
* @param {Function} listener
* @param {{
* once?: boolean;
* }} [options]
* @returns {{ dispose: Function, [SymbolDispose]: Function }}
*/
function addDisposableListener(type, listener, options = kEmptyObject) {
validateObject(options, 'options');
const {
once = false,
} = options;
validateBoolean(once, 'options.once');
if (once) {
this.once(type, listener);
} else {
this.on(type, listener);
}

// We use a function to create the disposer to further limiit what
// the closure captures.
return makeDisposer(this, type, listener);
};
EventEmitter.prototype.addDisposableListener = addDisposableListener;
39 changes: 17 additions & 22 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const {
Promise,
PromisePrototypeThen,
SymbolDispose,
globalThis: { DisposableStack },
} = primordials;

const {
Expand Down Expand Up @@ -177,36 +178,38 @@ function eos(stream, options, callback) {
callback.call(stream);
};

const disposableStack = new DisposableStack();

const onrequest = () => {
stream.req.on('finish', onfinish);
disposableStack.use(stream.req.addDisposableListener('finish', onfinish));
};

if (isRequest(stream)) {
stream.on('complete', onfinish);
disposableStack.use(stream.addDisposableListener('complete', onfinish));
if (!willEmitClose) {
stream.on('abort', onclose);
disposableStack.use(stream.addDisposableListener('abort', onclose));
}
if (stream.req) {
onrequest();
} else {
stream.on('request', onrequest);
disposableStack.use(stream.addDisposableListener('request', onrequest));
}
} else if (writable && !wState) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
disposableStack.use(stream.addDisposableListener('end', onlegacyfinish));
disposableStack.use(stream.addDisposableListener('close', onlegacyfinish));
}

// Not all streams will emit 'close' after 'aborted'.
if (!willEmitClose && typeof stream.aborted === 'boolean') {
stream.on('aborted', onclose);
disposableStack.use(stream.addDisposableListener('aborted', onclose));
}

stream.on('end', onend);
stream.on('finish', onfinish);
disposableStack.use(stream.addDisposableListener('end', onend));
disposableStack.use(stream.addDisposableListener('finish', onfinish));
if (options.error !== false) {
stream.on('error', onerror);
disposableStack.use(stream.addDisposableListener('error', onerror));
}
stream.on('close', onclose);
disposableStack.use(stream.addDisposableListener('close', onclose));

if (closed) {
process.nextTick(onclose);
Expand All @@ -233,18 +236,10 @@ function eos(stream, options, callback) {

const cleanup = () => {
callback = nop;
stream.removeListener('aborted', onclose);
stream.removeListener('complete', onfinish);
stream.removeListener('abort', onclose);
stream.removeListener('request', onrequest);
if (stream.req) stream.req.removeListener('finish', onfinish);
stream.removeListener('end', onlegacyfinish);
stream.removeListener('close', onlegacyfinish);
stream.removeListener('finish', onfinish);
stream.removeListener('end', onend);
stream.removeListener('error', onerror);
stream.removeListener('close', onclose);
disposableStack.dispose();
};
// Arrange for the cleanup function to call itself when disposed.
cleanup[SymbolDispose] = cleanup;

if (options.signal && !closed) {
const abort = () => {
Expand Down
28 changes: 16 additions & 12 deletions lib/internal/test_runner/harness.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ const {
PromiseWithResolvers,
SafeMap,
SafePromiseAllReturnVoid,
globalThis: { DisposableStack },
} = primordials;

const { getCallerLocation } = internalBinding('util');
const {
createHook,
Expand Down Expand Up @@ -230,6 +232,9 @@ function setupProcessState(root, globalOptions) {
const rejectionHandler =
createProcessEventHandler('unhandledRejection', root);
const coverage = configureCoverage(root, globalOptions);

const disposableStack = new DisposableStack();

const exitHandler = async (kill) => {
if (root.subtests.length === 0 && (root.hooks.before.length > 0 || root.hooks.after.length > 0)) {
// Run global before/after hooks in case there are no tests
Expand All @@ -254,27 +259,26 @@ function setupProcessState(root, globalOptions) {
}

hook.disable();
process.removeListener('uncaughtException', exceptionHandler);
process.removeListener('unhandledRejection', rejectionHandler);
process.removeListener('beforeExit', exitHandler);
if (globalOptions.isTestRunner) {
process.removeListener('SIGINT', terminationHandler);
process.removeListener('SIGTERM', terminationHandler);
}
disposableStack.dispose();
};

const terminationHandler = async () => {
await exitHandler(true);
process.exit();
};

process.on('uncaughtException', exceptionHandler);
process.on('unhandledRejection', rejectionHandler);
process.on('beforeExit', exitHandler);
disposableStack.use(
process.addDisposableListener('uncaughtException', exceptionHandler));
disposableStack.use(
process.addDisposableListener('unhandledRejection', rejectionHandler));
disposableStack.use(
process.addDisposableListener('beforeExit', exitHandler));
// TODO(MoLow): Make it configurable to hook when isTestRunner === false.
if (globalOptions.isTestRunner) {
process.on('SIGINT', terminationHandler);
process.on('SIGTERM', terminationHandler);
disposableStack.use(
process.addDisposableListener('SIGINT', terminationHandler));
disposableStack.use(
process.addDisposableListener('SIGTERM', terminationHandler));
}

root.harness.coverage = FunctionPrototypeBind(collectCoverage, null, root, coverage);
Expand Down
48 changes: 48 additions & 0 deletions test/parallel/test-events-disposable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict';

const common = require('../common');
const { strictEqual, throws } = require('assert');
const { EventEmitter } = require('events');

const emitter = new EventEmitter();

{
// Verify that the disposable stack removes the handlers
// when the stack is disposed.
using ds = new DisposableStack();
ds.use(emitter.addDisposableListener('foo', common.mustCall()));
ds.use(emitter.addDisposableListener('bar', common.mustCall()));
ds.use(emitter.addDisposableListener('baz', common.mustNotCall()),
{ once: true });
emitter.emit('foo');
emitter.emit('bar');
strictEqual(emitter.listenerCount('foo'), 1);
strictEqual(emitter.listenerCount('bar'), 1);

// The disposer returned by addDisposableListener can be called manually.
const disposer = emitter.addDisposableListener('foo', common.mustNotCall());
strictEqual(emitter.listenerCount('foo'), 2);
disposer.dispose();
strictEqual(emitter.listenerCount('foo'), 1);
// Disposer is callable multiple times without error.
disposer.dispose();
}
emitter.emit('foo');
emitter.emit('bar');
emitter.emit('baz');
strictEqual(emitter.listenerCount('foo'), 0);
strictEqual(emitter.listenerCount('bar'), 0);

// ============================================================================
// Type checking on inputs
throws(() => emitter.addDisposableListener('foo', 'not a function'), {
code: 'ERR_INVALID_ARG_TYPE',
});

throws(() => emitter.addDisposableListener('foo', () => {}, ''), {
code: 'ERR_INVALID_ARG_TYPE',
});

throws(() => emitter.addDisposableListener('foo', () => {}, { once: '' }), {
code: 'ERR_INVALID_ARG_TYPE',
});
Loading