Skip to content

Regression: Memory leak in RpcOutputStreamController when using callback API without async iterator #742

@gprojects-ch

Description

@gprojects-ch

Summary
Using RpcOutputStreamController with the callback API (onMessage, onNext, etc.) without ever acquiring an async iterator leads to unbounded memory growth.

This happens because _itState is always initialized and pushIt() always enqueues results, even when no async iterator is ever created to consume them.

Expected behavior
If the stream is consumed exclusively via callbacks and no async iterator is created, no internal iterator queue should accumulate.
Memory usage should remain stable.

Actual behavior
When using only callbacks:

  • _itState is initialized as { q: [] }
  • notifyMessage() → pushIt() is always called
  • Since state.p is undefined and no iterator exists, results are pushed into state.q
  • state.q is never consumed
  • The queue grows indefinitely
  • Memory leak occurs

Impact / real world scenario
This leak is not limited to synthetic tests.
It also occurs in real bidi streaming clients in a “send only” usage pattern: the client writes messages on the request stream while never consuming the response stream via async iteration.

Even if the server emits responses or completion events, the output stream controller keeps enqueueing into _itState.q when only callbacks are used (or when the response stream is not drained), causing unbounded retention.

Root cause
Recent changes replaced:

private _itState: undefined | {
  p?: Deferred<IteratorResult<T, null>>,
  q: Array<IteratorResult<T, null> | Error>,
};

with:

private _itState: {
  p?: Deferred<IteratorResult<T, null>>,
  q: Array<IteratorResult<T, null> | Error>,
} = { q: [] };

Additionally:

  • Initialization inside Symbol.asyncIterator() was removed
  • The guard if (!state) return; inside pushIt() was removed
    As a result, the iterator queue is always active, even if no iterator is used.

Regression
This behavior does not occur in version 2.9.6.
It appears starting from 2.10.0, most likely introduced by the changes related to Issue 650.
This indicates a regression in RpcOutputStreamController iterator state handling.

Minimal reproduction

import { RpcOutputStreamController } from "@protobuf-ts/runtime-rpc";

const stream = new RpcOutputStreamController<any>();

stream.onMessage(() => {
  // callback only
});

function createLargeObject(i: number) {
  return {
    id: i,
    timestamp: Date.now(),
    name: "test-message-" + i,
    metadata: {
      source: "my-client",
      region: "eu-central",
      version: "1.0.0",
    },
    values: Array.from({ length: 20 }, (_, j) => j + i),
    payload: "X".repeat(10_000), // 10 KB string
  };
}

for (let i = 0; i < 1_000_000; i++) {
  stream.notifyMessage(createLargeObject(i));
}

Heap steadily grows because state.q accumulates messages indefinitely.

Proposed fix
Restore lazy initialization of _itState and guard pushIt():

private pushIt(result: IteratorResult<T, null> | Error): void {
  const state = this._itState;
  if (!state) return;
  ...
}

This restores correct behavior:

  • Callback-only usage does not enqueue
  • Iterator usage keeps proper queueing semantics
  • Error-before-iterator behavior remains intact

Additional context
This issue is observable in long-running streaming scenarios where only callbacks are used. Heap snapshots show state.q retaining all emitted messages.

File and reference
Affected file
packages/runtime-rpc/src/rpc-output-stream.ts
https://github.com/timostamm/protobuf-ts/blob/main/packages/runtime-rpc/src/rpc-output-stream.ts

This leak is introduced by the changes from the fix discussed in Issue 650
#650

Environment
Working version:
"@protobuf-ts/runtime-rpc": 2.9.6

Broken version:
"@protobuf-ts/runtime-rpc": 2.10.0

Node.js: 24.8.0

Image

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions