fix: preserve streaming usage after AbortSignal cancellation#1178
fix: preserve streaming usage after AbortSignal cancellation#1178wsk-builds wants to merge 6 commits intoopenai:mainfrom
Conversation
🦋 Changeset detectedLatest commit: 87204f3 The changes in this PR will be included in the next version bump. This PR includes changesets to release 5 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8ae0af9247
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
|
Addressed both Codex review findings for the streaming usage snapshot path. The earlier streaming snapshot issue was caused by replacement logic that read index 0. This has been updated to replace the full trackedRequestUsageEntries set instead, so failed retry placeholders and the final successful request entry remain consistent with the aggregate totals. Also added targeted regression tests covering this behavior in: packages/agents-core/test/usage.test.ts Re-ran the required verification stack locally, and everything passed: pnpm i |
|
Thanks for sharing this. At a glance, this may be good, but I don't yet have confidence on how effective this is. If you're willing to spend more time on this, can you use the |
|
I used the In controlled streaming scenarios where the provider surfaces usage on What remains unverified is real-world effectiveness against live Responses streams, because that depends on whether To make that easy to verify, I prepared a ready-to-run live probe that a maintainer can execute with their own I’m pasting the temporary script and exact run instructions in the next comment so it can be executed on |
Ready-to-run live probe and exact instructionsThis probe is meant to validate real OpenAI Responses streaming behavior against a checkout that already has
It records:
The script reads these environment variables:
The probe uses only synthetic prompts and sets How to run
Use the same model and repeat count on both branches. Run on tmpdir=$(mktemp -d)
pnpm build
export OPENAI_API_KEY=...
export LIVE_PROBE_MODEL=gpt-4.1-mini
export LIVE_PROBE_REPEATS=3
export PROBE_OUTPUT_DIR="$tmpdir/main"
node /tmp/live-probe.mjsRun on this branch from the repository root: pnpm build
export OPENAI_API_KEY=...
export LIVE_PROBE_MODEL=gpt-4.1-mini
export LIVE_PROBE_REPEATS=3
export PROBE_OUTPUT_DIR="$tmpdir/candidate"
node /tmp/live-probe.mjsQuick comparison: jq '{model: .context.model, case_summaries}' "$tmpdir/main/results.json"
jq '{model: .context.model, case_summaries}' "$tmpdir/candidate/results.json"Interpretation:
If If both branches are mostly Temporary scriptimport { mkdirSync, writeFileSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join, resolve } from 'node:path';
import OpenAI from 'openai';
import { Agent, Runner } from '@openai/agents-core';
import { OpenAIResponsesModel } from '@openai/agents-openai';
const OPENAI_API_KEY = process.env.OPENAI_API_KEY;
if (!OPENAI_API_KEY) {
throw new Error(
'Missing OPENAI_API_KEY. Set it before running this live probe.',
);
}
const MODEL = process.env.LIVE_PROBE_MODEL ?? 'gpt-4.1-mini';
const REPEATS = parsePositiveInt(process.env.LIVE_PROBE_REPEATS, 3);
const MAX_OUTPUT_TOKENS = parsePositiveInt(
process.env.LIVE_PROBE_MAX_TOKENS,
2400,
);
const OUTPUT_DIR = resolve(
process.env.PROBE_OUTPUT_DIR ??
join(
tmpdir(),
`openai-agents-live-probe-${Date.now().toString(36)}`,
),
);
const CASES = [
{
case_id: 'C1',
scenario: 'completed control',
abort_after_chars: null,
},
{
case_id: 'E1',
scenario: 'early abort',
abort_after_chars: 80,
},
{
case_id: 'M1',
scenario: 'mid abort',
abort_after_chars: 600,
},
{
case_id: 'L1',
scenario: 'late abort',
abort_after_chars: 1800,
},
];
function parsePositiveInt(value, fallback) {
const parsed = Number.parseInt(value ?? '', 10);
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}
function serializeError(error) {
if (error instanceof Error) {
return {
name: error.name,
message: error.message,
stack: error.stack ?? null,
};
}
return {
name: 'UnknownError',
message: String(error),
stack: null,
};
}
function serializeUsage(usage) {
return {
requests: usage?.requests ?? 0,
inputTokens: usage?.inputTokens ?? 0,
outputTokens: usage?.outputTokens ?? 0,
totalTokens: usage?.totalTokens ?? 0,
inputTokensDetails: Array.isArray(usage?.inputTokensDetails)
? usage.inputTokensDetails
: [],
outputTokensDetails: Array.isArray(usage?.outputTokensDetails)
? usage.outputTokensDetails
: [],
requestUsageEntries: Array.isArray(usage?.requestUsageEntries)
? usage.requestUsageEntries.map((entry) => ({
inputTokens: entry?.inputTokens ?? 0,
outputTokens: entry?.outputTokens ?? 0,
totalTokens: entry?.totalTokens ?? 0,
inputTokensDetails: entry?.inputTokensDetails ?? {},
outputTokensDetails: entry?.outputTokensDetails ?? {},
endpoint: entry?.endpoint ?? null,
}))
: [],
};
}
function makePrompt(runLabel) {
return [
'You are participating in a streaming runtime probe.',
'Output exactly 160 numbered lines.',
'Each line must be a short factual statement about mechanical clocks.',
'Each line must be between 8 and 12 words.',
'Do not add any introduction or conclusion.',
`Probe nonce: ${runLabel}.`,
].join('\n');
}
function classifyAttempt(definition, attempt) {
if (attempt.error) {
return {
result_flag: 'negative',
observation_summary: `Run failed with ${attempt.error.name}: ${attempt.error.message}.`,
};
}
if (definition.abort_after_chars == null) {
if (
attempt.raw_event_counts.response_completed > 0 &&
attempt.final_usage.totalTokens > 0
) {
return {
result_flag: 'expected',
observation_summary:
'Completed control reached response.completed and returned non-zero usage.',
};
}
return {
result_flag: 'negative',
observation_summary:
'Completed control did not return both response.completed and non-zero final usage.',
};
}
if (!attempt.abort_triggered) {
return {
result_flag: 'blocked',
observation_summary:
'Abort threshold was not reached before the response ended.',
};
}
if (attempt.raw_event_counts.response_completed > 0) {
return {
result_flag: 'blocked',
observation_summary:
'The model completed before AbortSignal materially interrupted the stream.',
};
}
if (attempt.raw_event_counts.response_in_progress_with_usage > 0) {
if (attempt.final_usage.totalTokens > 0) {
return {
result_flag: 'expected',
observation_summary:
'A usage-bearing response.in_progress event was seen before abort, and final usage remained non-zero after
cancellation.',
};
}
return {
result_flag: 'negative',
observation_summary:
'A usage-bearing response.in_progress event was seen before abort, but final usage still ended at zero.',
};
}
if (attempt.raw_event_counts.response_in_progress > 0) {
return {
result_flag: 'blocked',
observation_summary:
'response.in_progress was observed before abort, but it carried no usage snapshot.',
};
}
return {
result_flag: 'blocked',
observation_summary:
'No response.in_progress event was observed before abort.',
};
}
function summarizeCase(definition, attempts) {
const counts = attempts.reduce(
(accumulator, attempt) => {
accumulator[attempt.result_flag] += 1;
if (attempt.raw_event_counts.response_in_progress > 0) {
accumulator.attempts_with_response_in_progress += 1;
}
if (attempt.raw_event_counts.response_in_progress_with_usage > 0) {
accumulator.attempts_with_usage_snapshot += 1;
}
if (attempt.final_usage.totalTokens > 0) {
accumulator.attempts_with_nonzero_final_usage += 1;
}
return accumulator;
},
{
expected: 0,
negative: 0,
blocked: 0,
attempts_with_response_in_progress: 0,
attempts_with_usage_snapshot: 0,
attempts_with_nonzero_final_usage: 0,
},
);
const result_flag =
counts.negative > 0
? 'negative'
: counts.expected > 0 && counts.blocked === 0
? 'expected'
: 'blocked';
let observation_summary;
if (definition.abort_after_chars == null) {
observation_summary = `${counts.expected}/${attempts.length} completed control attempts returned
response.completed with non-zero usage.`;
} else if (counts.negative > 0) {
observation_summary = `${counts.negative}/${attempts.length} abort attempts saw a usage-bearing
response.in_progress event before cancellation but still ended with zero final usage.`;
} else if (counts.expected > 0) {
observation_summary = `${counts.expected}/${attempts.length} abort attempts saw a usage-bearing
response.in_progress event before cancellation and preserved non-zero final usage. ${counts.blocked}/
${attempts.length} attempts were blocked because the live stream never exposed a usable snapshot before abort.`;
} else {
observation_summary = `All ${attempts.length} attempts were blocked because the live stream did not expose a
usable usage snapshot before abort.`;
}
return {
case_id: definition.case_id,
scenario: definition.scenario,
repeats: attempts.length,
result_flag,
observation_summary,
counts,
};
}
async function runAttempt(definition, attemptIndex, runner, agent) {
const controller = new AbortController();
const startedAt = Date.now();
const runLabel = `${definition.case_id}-attempt-${attemptIndex}`;
const rawEventTypes = [];
let text = '';
let abortTriggered = false;
let abort_at_chars = null;
let latestUsageSnapshot = null;
let error = null;
const raw_event_counts = {
response_in_progress: 0,
response_in_progress_with_usage: 0,
response_completed: 0,
};
const result = await runner.run(agent, makePrompt(runLabel), {
stream: true,
maxTurns: 1,
signal: controller.signal,
});
try {
for await (const event of result) {
if (event?.type !== 'raw_model_stream_event') {
continue;
}
const data = event.data;
if (data?.type === 'model') {
const openaiEventType = data?.event?.type ?? 'unknown';
rawEventTypes.push(openaiEventType);
if (openaiEventType === 'response.in_progress') {
raw_event_counts.response_in_progress += 1;
if (data?.providerData?.usageSnapshot) {
raw_event_counts.response_in_progress_with_usage += 1;
latestUsageSnapshot = serializeUsage(data.providerData.usageSnapshot);
}
} else if (openaiEventType === 'response.completed') {
raw_event_counts.response_completed += 1;
}
continue;
}
rawEventTypes.push(data?.type ?? 'unknown');
if (
definition.abort_after_chars != null &&
!abortTriggered &&
data?.type === 'output_text_delta' &&
typeof data?.delta === 'string'
) {
text += data.delta;
if (text.length >= definition.abort_after_chars) {
abortTriggered = true;
abort_at_chars = text.length;
controller.abort();
}
} else if (
data?.type === 'output_text_delta' &&
typeof data?.delta === 'string'
) {
text += data.delta;
}
}
await result.completed;
} catch (caught) {
error = serializeError(caught);
}
const final_usage = serializeUsage(result.state?.usage);
const elapsed_ms = Date.now() - startedAt;
const final_text_chars = text.length;
const preview = text.slice(0, 200);
const classification = classifyAttempt(definition, {
abort_triggered: abortTriggered,
error,
final_usage,
raw_event_counts,
});
return {
attempt_index: attemptIndex,
mode: 'single-shot',
abort_after_chars: definition.abort_after_chars,
abort_triggered: abortTriggered,
abort_at_chars,
elapsed_ms,
final_text_chars,
preview,
raw_event_counts,
raw_event_types_preview: rawEventTypes.slice(0, 40),
latest_usage_snapshot: latestUsageSnapshot,
final_usage,
error,
result_flag: classification.result_flag,
observation_summary: classification.observation_summary,
};
}
async function main() {
mkdirSync(OUTPUT_DIR, { recursive: true });
const client = new OpenAI({
apiKey: OPENAI_API_KEY,
...(process.env.OPENAI_BASE_URL
? { baseURL: process.env.OPENAI_BASE_URL }
: {}),
});
const model = new OpenAIResponsesModel(client, MODEL);
const agent = new Agent({
name: 'LiveStreamingUsageProbe',
instructions:
'Follow the user prompt exactly. Do not use tools. Produce plain text only.',
model,
modelSettings: {
temperature: 0,
maxTokens: MAX_OUTPUT_TOKENS,
store: false,
truncation: 'disabled',
},
});
const runner = new Runner({
tracingDisabled: true,
workflowName: 'Live abort usage probe',
modelProvider: {
getModel() {
throw new Error(
'This probe passes an explicit model and should not resolve a default provider model.',
);
},
},
});
const attempts_by_case = {};
const case_summaries = [];
for (const definition of CASES) {
const attempts = [];
for (let index = 1; index <= REPEATS; index += 1) {
const attempt = await runAttempt(definition, index, runner, agent);
attempts.push(attempt);
console.log(
`[${definition.case_id} ${index}/${REPEATS}] ${attempt.result_flag} - ${attempt.observation_summary}`,
);
}
attempts_by_case[definition.case_id] = attempts;
case_summaries.push(summarizeCase(definition, attempts));
}
const results = {
context: {
cwd: process.cwd(),
git_branch: process.env.GITHUB_HEAD_REF ?? null,
git_commit_hint: process.env.GITHUB_SHA ?? null,
node_version: process.version,
model: MODEL,
repeats: REPEATS,
max_output_tokens: MAX_OUTPUT_TOKENS,
output_dir: OUTPUT_DIR,
env_vars_used: [
'OPENAI_API_KEY',
...(process.env.OPENAI_BASE_URL ? ['OPENAI_BASE_URL'] : []),
...(process.env.LIVE_PROBE_MODEL ? ['LIVE_PROBE_MODEL'] : []),
...(process.env.LIVE_PROBE_REPEATS ? ['LIVE_PROBE_REPEATS'] : []),
...(process.env.LIVE_PROBE_MAX_TOKENS
? ['LIVE_PROBE_MAX_TOKENS']
: []),
],
},
cases: CASES,
case_summaries,
attempts_by_case,
};
const outputPath = join(OUTPUT_DIR, 'results.json');
writeFileSync(outputPath, `${JSON.stringify(results, null, 2)}\n`);
console.log(`\nWrote live probe results to ${outputPath}`);
}
main().catch((error) => {
console.error(error);
process.exitCode = 1;
}); |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e1484c776e
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a1ed8a842c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
Summary
agents-coreso aborted streams keep usage instead of waiting forresponse_doneresponse.in_progressraw model eventsWhy
This pull request resolves #995.
Streaming usage was only accumulated when a terminal
response_doneevent arrived. When callers aborted a streamed run early,result.state.usagecould remain at zero even though the provider had already reported usage, which breaks billing, quota, and observability workflows.What Changed
Usage.replaceCurrentRequestSnapshot()and wired the streaming runner to replace the in-flight request usage snapshot instead of only adding usage onresponse_donegetUsageSnapshotFromStreamEvent()so runners can consume normalized provider usage snapshots from raw model eventsusageSnapshottoresponse.in_progressraw events and reused a shared usage conversion helperagents-coreandagents-openairegression tests plus a patch changesetTesting
pnpm exec vitest run packages/agents-core/test/usage.test.ts packages/agents-core/test/run.stream.test.ts packages/agents-openai/test/openaiResponsesModel.test.tspnpm exec prettier --check packages/agents-core/src/usage.ts packages/agents-core/src/runner/streaming.ts packages/agents-core/src/run.ts packages/agents-core/test/usage.test.ts packages/agents-core/test/run.stream.test.ts packages/agents-openai/src/openaiResponsesModel.ts packages/agents-openai/test/openaiResponsesModel.test.tsCI=1 pnpm i --offlinepnpm buildpnpm -r build-checkpnpm -r -F "@openai/*" dist:checkpnpm lintpnpm test(fails in the existingpackages/agents-core/test/result.test.tsleak checks becausespawnSync(process.execPath, ['--import', 'tsx', ...])captures empty stdout in this environment; the two manual leak scripts still pass when run directly)Risks / Follow-ups
usageSnapshotdata; providers without that signal keep the existingresponse_donebehaviortsx/spawnSyncstdout-capture issue noted above