Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/replay-aborted-tool-outputs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openai/agents-core': patch
---

Replay managed tool outputs that completed before an abort, so aborted runs don't lose already-finished tool results on the next turn.
88 changes: 88 additions & 0 deletions packages/agents-core/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ import {
} from './runner/sessionPersistence';
import { resolveTurnAfterModelResponse } from './runner/turnResolution';
import { prepareTurn } from './runner/turnPreparation';
import {
clearManagedConversationSupplementalItems,
queueManagedConversationSupplementalItems,
} from './runner/turnPreparation';
import {
applyTurnResult,
handleInterruptedOutcome,
Expand All @@ -85,6 +89,71 @@ import type {
import { tryHandleRunError } from './runner/errorHandlers';
import type { RunErrorHandlers } from './runner/errorHandlers';

function extractPendingFunctionCallOutputsFromRawModelEvents(
rawModelEvents: unknown[],
): AgentInputItem[] {
const pendingItems: AgentInputItem[] = [];
const seenCallIds = new Set<string>();

const getRawItem = (event: unknown): Record<string, unknown> | undefined => {
if (!event || typeof event !== 'object') {
return undefined;
}

const rawEvent = event as Record<string, unknown>;
const eventType = rawEvent.type;
if (
eventType !== 'response.output_item.added' &&
eventType !== 'response.output_item.done'
) {
return undefined;
}

const item =
(rawEvent.item as Record<string, unknown> | undefined) ??
(rawEvent.output_item as Record<string, unknown> | undefined);

return item && typeof item === 'object' ? item : undefined;
};

for (const rawEvent of rawModelEvents) {
const rawItem = getRawItem(rawEvent);
if (!rawItem || rawItem.type !== 'function_call') {
continue;
}

const callId =
typeof rawItem.call_id === 'string'
? rawItem.call_id
: typeof rawItem.callId === 'string'
? rawItem.callId
: undefined;
const name =
typeof rawItem.name === 'string' ? rawItem.name : undefined;

if (!callId || !name || seenCallIds.has(callId)) {
continue;
}

seenCallIds.add(callId);
pendingItems.push({
type: 'function_call_result',
name,
...(typeof rawItem.namespace === 'string'
? { namespace: rawItem.namespace }
: {}),
callId,
status: 'completed',
output: {
type: 'text',
text: 'aborted',
},
});
}

return pendingItems;
}

export type {
CallModelInputFilter,
CallModelInputFilterArgs,
Expand Down Expand Up @@ -1150,6 +1219,7 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {

let finalResponse: ModelResponse | undefined = undefined;
let inputMarked = false;
const rawModelEvents: unknown[] = [];
const markInputOnce = () => {
if (inputMarked || !serverConversationTracker) {
return;
Expand Down Expand Up @@ -1212,6 +1282,8 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
requestId: parsed.response.requestId,
};
result.state._context.usage.add(finalResponse.usage);
} else if (event.type === 'model') {
rawModelEvents.push(event.event);
}
if (result.cancelled) {
// When the user's code exits a loop to consume the stream, we need to break
Expand All @@ -1226,6 +1298,16 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
if (sentInputToModel) {
markInputOnce();
}
if (serverConversationTracker?.conversationId) {
const pendingItems =
extractPendingFunctionCallOutputsFromRawModelEvents(
rawModelEvents,
);
queueManagedConversationSupplementalItems(
serverConversationTracker.conversationId,
pendingItems,
);
}
await awaitGuardrailsAndPersistInput();
return;
}
Expand All @@ -1236,6 +1318,12 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
markInputOnce();
}

if (serverConversationTracker?.conversationId && inputMarked) {
clearManagedConversationSupplementalItems(
serverConversationTracker.conversationId,
);
Comment on lines +1321 to +1324
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Clear pending abort outputs on every successful turn

clearManagedConversationSupplementalItems is called only in the streaming path, so abort outputs queued in pendingManagedConversationAbortItems survive successful non-streaming conversationId turns. Because getManagedConversationSupplementalItems always prepends those pending items, a later fresh runner.run(..., { conversationId }) can resend the same synthetic function_call_result again, which can rebalance the transcript incorrectly or trigger duplicate-call errors from the provider. Please clear the queue after successful non-streaming model calls as well.

Useful? React with 👍 / 👎.

}

await awaitGuardrailsAndPersistInput();

if (result.cancelled) {
Expand Down
58 changes: 54 additions & 4 deletions packages/agents-core/src/runner/turnPreparation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,63 @@ const managedConversationSupplementalItemsCache = new WeakMap<
ProcessedResponse<any>,
AgentInputItem[]
>();
const pendingManagedConversationAbortItems = new Map<string, AgentInputItem[]>();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Bound pending abort cache to avoid unbounded growth

The new pendingManagedConversationAbortItems cache is a process-global Map keyed by conversationId, but this commit adds no general eviction path for entries that are never replayed successfully. Since queueManagedConversationSupplementalItems keeps inserting IDs and cleanup currently depends on later turn success, aborted conversations that are never resumed can accumulate indefinitely in long-lived workers, causing memory growth over time. Please add a bounded eviction strategy (for example TTL/size cap, and/or broader clear paths) so orphaned conversation IDs do not stay resident forever.

Useful? React with 👍 / 👎.


export function queueManagedConversationSupplementalItems(
conversationId: string | undefined,
items: AgentInputItem[],
): void {
if (!conversationId || items.length === 0) {
return;
}

const existing = pendingManagedConversationAbortItems.get(conversationId) ?? [];
const merged = [...existing];
const seenCallIds = new Set(
existing.flatMap((item) =>
item.type === 'function_call_result' && typeof item.callId === 'string'
? [item.callId]
: [],
),
);

for (const item of items) {
if (
item.type !== 'function_call_result' ||
typeof item.callId !== 'string' ||
seenCallIds.has(item.callId)
) {
continue;
}
merged.push(item);
seenCallIds.add(item.callId);
}

if (merged.length > 0) {
pendingManagedConversationAbortItems.set(conversationId, merged);
}
}

export function clearManagedConversationSupplementalItems(
conversationId: string | undefined,
): void {
if (!conversationId) {
return;
}
pendingManagedConversationAbortItems.delete(conversationId);
}

export function getManagedConversationSupplementalItems<
TContext,
TAgent extends Agent<TContext, AgentOutputType>,
>(state: RunState<TContext, TAgent>): AgentInputItem[] {
const pendingAbortItems = state._conversationId
? pendingManagedConversationAbortItems.get(state._conversationId) ?? []
: [];
const processedResponse = state._lastProcessedResponse;
const handoffs = processedResponse?.handoffs;
if (!handoffs || handoffs.length <= 1) {
return [];
return pendingAbortItems;
}

const acceptedCallId = handoffs[0]?.toolCall.callId;
Expand All @@ -156,13 +204,15 @@ export function getManagedConversationSupplementalItems<
item.rawItem.callId === acceptedCallId,
);
if (!acceptedHandoffOutputStillPresent) {
return [];
return pendingAbortItems;
}

const cached =
managedConversationSupplementalItemsCache.get(processedResponse);
if (cached) {
return cached;
return pendingAbortItems.length > 0
? [...pendingAbortItems, ...cached]
: cached;
}

// Server-managed transcripts still contain ignored handoff calls from the last response.
Expand All @@ -173,7 +223,7 @@ export function getManagedConversationSupplementalItems<
getToolCallOutputItem(toolCall, IGNORED_HANDOFF_OUTPUT_MESSAGE),
);
managedConversationSupplementalItemsCache.set(processedResponse, items);
return items;
return pendingAbortItems.length > 0 ? [...pendingAbortItems, ...items] : items;
}

async function runInputGuardrailsForTurn<
Expand Down
124 changes: 124 additions & 0 deletions packages/agents-core/test/run.stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1935,6 +1935,130 @@ describe('Runner.run (streaming)', () => {
]);
});

it('replays synthetic tool outputs after an aborted managed conversation turn', async () => {
class AbortAfterFunctionCallStreamingModel implements Model {
public readonly requests: ModelRequest[] = [];
private attempt = 0;

async getResponse(): Promise<ModelResponse> {
throw new Error('not used');
}

async *getStreamedResponse(
request: ModelRequest,
): AsyncIterable<StreamEvent> {
this.requests.push({
...request,
input: Array.isArray(request.input)
? (JSON.parse(JSON.stringify(request.input)) as AgentInputItem[])
: request.input,
});
this.attempt += 1;

if (this.attempt === 1) {
yield { type: 'response_started' } as any;
yield {
type: 'model',
event: {
type: 'response.output_item.done',
item: {
type: 'function_call',
id: 'fc_abort_1',
call_id: 'call_abort_1',
name: 'test',
arguments: '{"test":"abort"}',
},
},
providerData: {
rawModelEventSource: 'openai-responses',
},
} as any;

const abortError = new Error('aborted');
(abortError as Error & { name: string }).name = 'AbortError';
const signal = request.signal as AbortSignal | undefined;
await new Promise((_resolve, reject) => {
if (signal?.aborted) {
reject(abortError);
return;
}
const onAbort = () => {
signal?.removeEventListener('abort', onAbort);
reject(abortError);
};
signal?.addEventListener('abort', onAbort, { once: true });
});
return;
}

yield {
type: 'response_done',
response: {
id: 'resp-after-abort',
usage: {
requests: 1,
inputTokens: 0,
outputTokens: 0,
totalTokens: 0,
},
output: [fakeModelMessage('done after abort')],
},
} as any;
}
}

const model = new AbortAfterFunctionCallStreamingModel();
const agent = new Agent({
name: 'ManagedAbortRecovery',
model,
tools: [serverTool],
});
const runner = new Runner();
const controller = new AbortController();

const firstRun = await runner.run(agent, 'hi', {
stream: true,
conversationId: 'conv-abort-recovery',
signal: controller.signal,
});
const reader = (firstRun.toStream() as any).getReader();

while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
if (
value?.type === 'raw_model_stream_event' &&
value.data?.type === 'model' &&
value.data?.event?.type === 'response.output_item.done'
) {
controller.abort();
break;
}
}
await firstRun.completed;

const secondRun = await runner.run(agent, 'resume after abort', {
stream: true,
conversationId: 'conv-abort-recovery',
});
await drain(secondRun);

expect(secondRun.finalOutput).toBe('done after abort');
expect(model.requests).toHaveLength(2);
expect(getRequestInputItems(model.requests[1])).toEqual([
expect.objectContaining({
role: 'user',
}),
expect.objectContaining({
type: 'function_call_result',
callId: 'call_abort_1',
name: 'test',
}),
]);
});

it('does not replay orphan hosted shell calls in default streamed multi-turn runs', async () => {
const hostedShell = shellTool({
environment: { type: 'container_auto' },
Expand Down