Skip to content

fix: preserve streaming usage after AbortSignal cancellation#1178

Open
wsk-builds wants to merge 6 commits intoopenai:mainfrom
wsk-builds:fix/retain-streaming-usage-on-abort
Open

fix: preserve streaming usage after AbortSignal cancellation#1178
wsk-builds wants to merge 6 commits intoopenai:mainfrom
wsk-builds:fix/retain-streaming-usage-on-abort

Conversation

@wsk-builds
Copy link
Copy Markdown
Contributor

Summary

  • preserve the latest streaming usage snapshot in agents-core so aborted streams keep usage instead of waiting for response_done
  • expose normalized usage snapshots from OpenAI Responses response.in_progress raw model events
  • add regression coverage for snapshot replacement, abort preservation, and provider snapshot mapping

Why

This pull request resolves #995.

Streaming usage was only accumulated when a terminal response_done event arrived. When callers aborted a streamed run early, result.state.usage could remain at zero even though the provider had already reported usage, which breaks billing, quota, and observability workflows.

What Changed

  • added Usage.replaceCurrentRequestSnapshot() and wired the streaming runner to replace the in-flight request usage snapshot instead of only adding usage on response_done
  • added getUsageSnapshotFromStreamEvent() so runners can consume normalized provider usage snapshots from raw model events
  • updated OpenAI Responses streaming conversion to attach a normalized usageSnapshot to response.in_progress raw events and reused a shared usage conversion helper
  • added agents-core and agents-openai regression tests plus a patch changeset

Testing

  • pnpm exec vitest run packages/agents-core/test/usage.test.ts packages/agents-core/test/run.stream.test.ts packages/agents-openai/test/openaiResponsesModel.test.ts
  • pnpm exec prettier --check packages/agents-core/src/usage.ts packages/agents-core/src/runner/streaming.ts packages/agents-core/src/run.ts packages/agents-core/test/usage.test.ts packages/agents-core/test/run.stream.test.ts packages/agents-openai/src/openaiResponsesModel.ts packages/agents-openai/test/openaiResponsesModel.test.ts
  • CI=1 pnpm i --offline
  • pnpm build
  • pnpm -r build-check
  • pnpm -r -F "@openai/*" dist:check
  • pnpm lint
  • pnpm test (fails in the existing packages/agents-core/test/result.test.ts leak checks because spawnSync(process.execPath, ['--import', 'tsx', ...]) captures empty stdout in this environment; the two manual leak scripts still pass when run directly)

Risks / Follow-ups

  • providers only contribute early usage when they attach normalized usageSnapshot data; providers without that signal keep the existing response_done behavior
  • the full suite still has the existing tsx/spawnSync stdout-capture issue noted above

@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented Apr 16, 2026

🦋 Changeset detected

Latest commit: 87204f3

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 5 packages
Name Type
@openai/agents-core Patch
@openai/agents-openai Patch
@openai/agents-extensions Patch
@openai/agents-realtime Patch
@openai/agents Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 8ae0af9247

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread packages/agents-core/src/usage.ts
Comment thread packages/agents-core/src/usage.ts Outdated
@seratch seratch marked this pull request as draft April 17, 2026 05:41
@wsk-builds
Copy link
Copy Markdown
Contributor Author

Addressed both Codex review findings for the streaming usage snapshot path.

The earlier streaming snapshot issue was caused by replacement logic that read index 0. This has been updated to replace the full trackedRequestUsageEntries set instead, so failed retry placeholders and the final successful request entry remain consistent with the aggregate totals.

Also added targeted regression tests covering this behavior in:

packages/agents-core/test/usage.test.ts
packages/agents-core/test/run.stream.test.ts
packages/agents-openai/test/openaiResponsesModel.test.ts

Re-ran the required verification stack locally, and everything passed:

pnpm i
pnpm build
pnpm -r build-check
pnpm -r -F "@openai/*" dist:check
pnpm lint
pnpm test

@wsk-builds wsk-builds marked this pull request as ready for review April 17, 2026 15:28
@seratch
Copy link
Copy Markdown
Member

seratch commented Apr 18, 2026

Thanks for sharing this. At a glance, this may be good, but I don't yet have confidence on how effective this is.

If you're willing to spend more time on this, can you use the $runtime-behavior-probe skill in this repo to verify the runtime behavior is correctly improved in many real scenarios?

@wsk-builds wsk-builds marked this pull request as draft April 19, 2026 13:56
@wsk-builds
Copy link
Copy Markdown
Contributor Author

I used the runtime-behavior-probe workflow to compare main against this branch locally.

In controlled streaming scenarios where the provider surfaces usage on response.in_progress, main still loses
usage after AbortSignal cancellation, while this branch preserves the latest usage snapshot and does not regress the
normal completed-stream or retry-adjusted accounting paths.

What remains unverified is real-world effectiveness against live Responses streams, because that depends on whether
the OpenAI service actually emits in-progress usage before the terminal event in those scenarios.

To make that easy to verify, I prepared a ready-to-run live probe that a maintainer can execute with their own
OPENAI_API_KEY. It runs a completed control plus early/mid/late abort cases, records whether response.in_progress
carried usage, and captures the final result.state.usage after cancellation.

I’m pasting the temporary script and exact run instructions in the next comment so it can be executed on main and on
this branch with the same model and repeat count.

@wsk-builds
Copy link
Copy Markdown
Contributor Author

Ready-to-run live probe and exact instructions

This probe is meant to validate real OpenAI Responses streaming behavior against a checkout that already has
dependencies installed. It compares:

  • a completed control
  • an early abort
  • a mid-stream abort
  • a late abort

It records:

  • whether response.in_progress was observed
  • whether any response.in_progress event carried a usage snapshot
  • whether result.state.usage remained non-zero after abort
  • whether the stream actually completed before the abort materially interrupted it

The script reads these environment variables:

  • OPENAI_API_KEY: required
  • OPENAI_BASE_URL: optional, only if a proxy/gateway is used
  • LIVE_PROBE_MODEL: optional, defaults to gpt-4.1-mini
  • LIVE_PROBE_REPEATS: optional, defaults to 3
  • LIVE_PROBE_MAX_TOKENS: optional, defaults to 2400
  • PROBE_OUTPUT_DIR: optional

The probe uses only synthetic prompts and sets store: false. It does not mutate remote state, but it does create
billable API requests.

How to run

  1. Save the script below as /tmp/live-probe.mjs
  2. Run it on main
  3. Run it again on this branch
  4. Compare the resulting results.json files

Use the same model and repeat count on both branches.

Run on main from the repository root:

tmpdir=$(mktemp -d)

pnpm build
export OPENAI_API_KEY=...
export LIVE_PROBE_MODEL=gpt-4.1-mini
export LIVE_PROBE_REPEATS=3
export PROBE_OUTPUT_DIR="$tmpdir/main"
node /tmp/live-probe.mjs

Run on this branch from the repository root:

pnpm build
export OPENAI_API_KEY=...
export LIVE_PROBE_MODEL=gpt-4.1-mini
export LIVE_PROBE_REPEATS=3
export PROBE_OUTPUT_DIR="$tmpdir/candidate"
node /tmp/live-probe.mjs

Quick comparison:

jq '{model: .context.model, case_summaries}' "$tmpdir/main/results.json"
jq '{model: .context.model, case_summaries}' "$tmpdir/candidate/results.json"

Interpretation:

  • expected on an abort case means:
    • a usage-bearing response.in_progress event was seen before abort
    • final result.state.usage.totalTokens stayed non-zero after abort
  • negative on an abort case means:
    • a usage-bearing response.in_progress event was seen before abort
    • final usage still ended at zero
  • blocked on an abort case usually means:
    • the live stream never exposed a usable snapshot before abort, or
    • the response completed before the abort materially interrupted the stream

If main shows negative and this branch shows expected for the same abort case, that is strong live evidence that
the branch improves runtime behavior.

If both branches are mostly blocked, then the limiting factor is likely the live service not emitting a usable in-
progress usage snapshot in those runs, not the SDK replacement logic.

Temporary script

import { mkdirSync, writeFileSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join, resolve } from 'node:path';
import OpenAI from 'openai';
import { Agent, Runner } from '@openai/agents-core';
import { OpenAIResponsesModel } from '@openai/agents-openai';

const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
if (!OPENAI_API_KEY) {
  throw new Error(
    'Missing OPENAI_API_KEY. Set it before running this live probe.',
  );
}

const MODEL = process.env.LIVE_PROBE_MODEL ?? 'gpt-4.1-mini';
const REPEATS = parsePositiveInt(process.env.LIVE_PROBE_REPEATS, 3);
const MAX_OUTPUT_TOKENS = parsePositiveInt(
  process.env.LIVE_PROBE_MAX_TOKENS,
  2400,
);
const OUTPUT_DIR = resolve(
  process.env.PROBE_OUTPUT_DIR ??
    join(
      tmpdir(),
      `openai-agents-live-probe-${Date.now().toString(36)}`,
    ),
);

const CASES = [
  {
    case_id: 'C1',
    scenario: 'completed control',
    abort_after_chars: null,
  },
  {
    case_id: 'E1',
    scenario: 'early abort',
    abort_after_chars: 80,
  },
  {
    case_id: 'M1',
    scenario: 'mid abort',
    abort_after_chars: 600,
  },
  {
    case_id: 'L1',
    scenario: 'late abort',
    abort_after_chars: 1800,
  },
];

function parsePositiveInt(value, fallback) {
  const parsed = Number.parseInt(value ?? '', 10);
  return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}

function serializeError(error) {
  if (error instanceof Error) {
    return {
      name: error.name,
      message: error.message,
      stack: error.stack ?? null,
    };
  }
  return {
    name: 'UnknownError',
    message: String(error),
    stack: null,
  };
}

function serializeUsage(usage) {
  return {
    requests: usage?.requests ?? 0,
    inputTokens: usage?.inputTokens ?? 0,
    outputTokens: usage?.outputTokens ?? 0,
    totalTokens: usage?.totalTokens ?? 0,
    inputTokensDetails: Array.isArray(usage?.inputTokensDetails)
      ? usage.inputTokensDetails
      : [],
    outputTokensDetails: Array.isArray(usage?.outputTokensDetails)
      ? usage.outputTokensDetails
      : [],
    requestUsageEntries: Array.isArray(usage?.requestUsageEntries)
      ? usage.requestUsageEntries.map((entry) => ({
          inputTokens: entry?.inputTokens ?? 0,
          outputTokens: entry?.outputTokens ?? 0,
          totalTokens: entry?.totalTokens ?? 0,
          inputTokensDetails: entry?.inputTokensDetails ?? {},
          outputTokensDetails: entry?.outputTokensDetails ?? {},
          endpoint: entry?.endpoint ?? null,
        }))
      : [],
  };
}

function makePrompt(runLabel) {
  return [
    'You are participating in a streaming runtime probe.',
    'Output exactly 160 numbered lines.',
    'Each line must be a short factual statement about mechanical clocks.',
    'Each line must be between 8 and 12 words.',
    'Do not add any introduction or conclusion.',
    `Probe nonce: ${runLabel}.`,
  ].join('\n');
}

function classifyAttempt(definition, attempt) {
  if (attempt.error) {
    return {
      result_flag: 'negative',
      observation_summary: `Run failed with ${attempt.error.name}: ${attempt.error.message}.`,
    };
  }

  if (definition.abort_after_chars == null) {
    if (
      attempt.raw_event_counts.response_completed > 0 &&
      attempt.final_usage.totalTokens > 0
    ) {
      return {
        result_flag: 'expected',
        observation_summary:
          'Completed control reached response.completed and returned non-zero usage.',
      };
    }
    return {
      result_flag: 'negative',
      observation_summary:
        'Completed control did not return both response.completed and non-zero final usage.',
    };
  }

  if (!attempt.abort_triggered) {
    return {
      result_flag: 'blocked',
      observation_summary:
        'Abort threshold was not reached before the response ended.',
    };
  }

  if (attempt.raw_event_counts.response_completed > 0) {
    return {
      result_flag: 'blocked',
      observation_summary:
        'The model completed before AbortSignal materially interrupted the stream.',
    };
  }

  if (attempt.raw_event_counts.response_in_progress_with_usage > 0) {
    if (attempt.final_usage.totalTokens > 0) {
      return {
        result_flag: 'expected',
        observation_summary:
          'A usage-bearing response.in_progress event was seen before abort, and final usage remained non-zero after
cancellation.',
      };
    }
    return {
      result_flag: 'negative',
      observation_summary:
        'A usage-bearing response.in_progress event was seen before abort, but final usage still ended at zero.',
    };
  }

  if (attempt.raw_event_counts.response_in_progress > 0) {
    return {
      result_flag: 'blocked',
      observation_summary:
        'response.in_progress was observed before abort, but it carried no usage snapshot.',
    };
  }

  return {
    result_flag: 'blocked',
    observation_summary:
      'No response.in_progress event was observed before abort.',
  };
}

function summarizeCase(definition, attempts) {
  const counts = attempts.reduce(
    (accumulator, attempt) => {
      accumulator[attempt.result_flag] += 1;
      if (attempt.raw_event_counts.response_in_progress > 0) {
        accumulator.attempts_with_response_in_progress += 1;
      }
      if (attempt.raw_event_counts.response_in_progress_with_usage > 0) {
        accumulator.attempts_with_usage_snapshot += 1;
      }
      if (attempt.final_usage.totalTokens > 0) {
        accumulator.attempts_with_nonzero_final_usage += 1;
      }
      return accumulator;
    },
    {
      expected: 0,
      negative: 0,
      blocked: 0,
      attempts_with_response_in_progress: 0,
      attempts_with_usage_snapshot: 0,
      attempts_with_nonzero_final_usage: 0,
    },
  );

  const result_flag =
    counts.negative > 0
      ? 'negative'
      : counts.expected > 0 && counts.blocked === 0
        ? 'expected'
        : 'blocked';

  let observation_summary;
  if (definition.abort_after_chars == null) {
    observation_summary = `${counts.expected}/${attempts.length} completed control attempts returned
response.completed with non-zero usage.`;
  } else if (counts.negative > 0) {
    observation_summary = `${counts.negative}/${attempts.length} abort attempts saw a usage-bearing
response.in_progress event before cancellation but still ended with zero final usage.`;
  } else if (counts.expected > 0) {
    observation_summary = `${counts.expected}/${attempts.length} abort attempts saw a usage-bearing
response.in_progress event before cancellation and preserved non-zero final usage. ${counts.blocked}/
${attempts.length} attempts were blocked because the live stream never exposed a usable snapshot before abort.`;
  } else {
    observation_summary = `All ${attempts.length} attempts were blocked because the live stream did not expose a
usable usage snapshot before abort.`;
  }

  return {
    case_id: definition.case_id,
    scenario: definition.scenario,
    repeats: attempts.length,
    result_flag,
    observation_summary,
    counts,
  };
}

async function runAttempt(definition, attemptIndex, runner, agent) {
  const controller = new AbortController();
  const startedAt = Date.now();
  const runLabel = `${definition.case_id}-attempt-${attemptIndex}`;
  const rawEventTypes = [];
  let text = '';
  let abortTriggered = false;
  let abort_at_chars = null;
  let latestUsageSnapshot = null;
  let error = null;

  const raw_event_counts = {
    response_in_progress: 0,
    response_in_progress_with_usage: 0,
    response_completed: 0,
  };

  const result = await runner.run(agent, makePrompt(runLabel), {
    stream: true,
    maxTurns: 1,
    signal: controller.signal,
  });

  try {
    for await (const event of result) {
      if (event?.type !== 'raw_model_stream_event') {
        continue;
      }

      const data = event.data;
      if (data?.type === 'model') {
        const openaiEventType = data?.event?.type ?? 'unknown';
        rawEventTypes.push(openaiEventType);

        if (openaiEventType === 'response.in_progress') {
          raw_event_counts.response_in_progress += 1;
          if (data?.providerData?.usageSnapshot) {
            raw_event_counts.response_in_progress_with_usage += 1;
            latestUsageSnapshot = serializeUsage(data.providerData.usageSnapshot);
          }
        } else if (openaiEventType === 'response.completed') {
          raw_event_counts.response_completed += 1;
        }

        continue;
      }

      rawEventTypes.push(data?.type ?? 'unknown');
      if (
        definition.abort_after_chars != null &&
        !abortTriggered &&
        data?.type === 'output_text_delta' &&
        typeof data?.delta === 'string'
      ) {
        text += data.delta;
        if (text.length >= definition.abort_after_chars) {
          abortTriggered = true;
          abort_at_chars = text.length;
          controller.abort();
        }
      } else if (
        data?.type === 'output_text_delta' &&
        typeof data?.delta === 'string'
      ) {
        text += data.delta;
      }
    }

    await result.completed;
  } catch (caught) {
    error = serializeError(caught);
  }

  const final_usage = serializeUsage(result.state?.usage);
  const elapsed_ms = Date.now() - startedAt;
  const final_text_chars = text.length;
  const preview = text.slice(0, 200);
  const classification = classifyAttempt(definition, {
    abort_triggered: abortTriggered,
    error,
    final_usage,
    raw_event_counts,
  });

  return {
    attempt_index: attemptIndex,
    mode: 'single-shot',
    abort_after_chars: definition.abort_after_chars,
    abort_triggered: abortTriggered,
    abort_at_chars,
    elapsed_ms,
    final_text_chars,
    preview,
    raw_event_counts,
    raw_event_types_preview: rawEventTypes.slice(0, 40),
    latest_usage_snapshot: latestUsageSnapshot,
    final_usage,
    error,
    result_flag: classification.result_flag,
    observation_summary: classification.observation_summary,
  };
}

async function main() {
  mkdirSync(OUTPUT_DIR, { recursive: true });

  const client = new OpenAI({
    apiKey: OPENAI_API_KEY,
    ...(process.env.OPENAI_BASE_URL
      ? { baseURL: process.env.OPENAI_BASE_URL }
      : {}),
  });

  const model = new OpenAIResponsesModel(client, MODEL);
  const agent = new Agent({
    name: 'LiveStreamingUsageProbe',
    instructions:
      'Follow the user prompt exactly. Do not use tools. Produce plain text only.',
    model,
    modelSettings: {
      temperature: 0,
      maxTokens: MAX_OUTPUT_TOKENS,
      store: false,
      truncation: 'disabled',
    },
  });

  const runner = new Runner({
    tracingDisabled: true,
    workflowName: 'Live abort usage probe',
    modelProvider: {
      getModel() {
        throw new Error(
          'This probe passes an explicit model and should not resolve a default provider model.',
        );
      },
    },
  });

  const attempts_by_case = {};
  const case_summaries = [];

  for (const definition of CASES) {
    const attempts = [];
    for (let index = 1; index <= REPEATS; index += 1) {
      const attempt = await runAttempt(definition, index, runner, agent);
      attempts.push(attempt);
      console.log(
        `[${definition.case_id} ${index}/${REPEATS}] ${attempt.result_flag} - ${attempt.observation_summary}`,
      );
    }
    attempts_by_case[definition.case_id] = attempts;
    case_summaries.push(summarizeCase(definition, attempts));
  }

  const results = {
    context: {
      cwd: process.cwd(),
      git_branch: process.env.GITHUB_HEAD_REF ?? null,
      git_commit_hint: process.env.GITHUB_SHA ?? null,
      node_version: process.version,
      model: MODEL,
      repeats: REPEATS,
      max_output_tokens: MAX_OUTPUT_TOKENS,
      output_dir: OUTPUT_DIR,
      env_vars_used: [
        'OPENAI_API_KEY',
        ...(process.env.OPENAI_BASE_URL ? ['OPENAI_BASE_URL'] : []),
        ...(process.env.LIVE_PROBE_MODEL ? ['LIVE_PROBE_MODEL'] : []),
        ...(process.env.LIVE_PROBE_REPEATS ? ['LIVE_PROBE_REPEATS'] : []),
        ...(process.env.LIVE_PROBE_MAX_TOKENS
          ? ['LIVE_PROBE_MAX_TOKENS']
          : []),
      ],
    },
    cases: CASES,
    case_summaries,
    attempts_by_case,
  };

  const outputPath = join(OUTPUT_DIR, 'results.json');
  writeFileSync(outputPath, `${JSON.stringify(results, null, 2)}\n`);
  console.log(`\nWrote live probe results to ${outputPath}`);
}

main().catch((error) => {
  console.error(error);
  process.exitCode = 1;
});

@wsk-builds wsk-builds marked this pull request as ready for review April 20, 2026 15:05
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e1484c776e

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread packages/agents-core/src/run.ts
Comment thread packages/agents-core/src/runner/streaming.ts
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: a1ed8a842c

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread packages/agents-core/src/usage.ts Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Usage data unavailable when streaming run is aborted via AbortSignal

4 participants