-
Notifications
You must be signed in to change notification settings - Fork 708
fix(core): replay aborted managed tool outputs #1191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| '@openai/agents-core': patch | ||
| --- | ||
|
|
||
| Replay managed tool outputs that completed before an abort, so aborted runs don't lose already-finished tool results on the next turn. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -135,15 +135,63 @@ const managedConversationSupplementalItemsCache = new WeakMap< | |
| ProcessedResponse<any>, | ||
| AgentInputItem[] | ||
| >(); | ||
| const pendingManagedConversationAbortItems = new Map<string, AgentInputItem[]>(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The new Useful? React with 👍 / 👎. |
||
|
|
||
| export function queueManagedConversationSupplementalItems( | ||
| conversationId: string | undefined, | ||
| items: AgentInputItem[], | ||
| ): void { | ||
| if (!conversationId || items.length === 0) { | ||
| return; | ||
| } | ||
|
|
||
| const existing = pendingManagedConversationAbortItems.get(conversationId) ?? []; | ||
| const merged = [...existing]; | ||
| const seenCallIds = new Set( | ||
| existing.flatMap((item) => | ||
| item.type === 'function_call_result' && typeof item.callId === 'string' | ||
| ? [item.callId] | ||
| : [], | ||
| ), | ||
| ); | ||
|
|
||
| for (const item of items) { | ||
| if ( | ||
| item.type !== 'function_call_result' || | ||
| typeof item.callId !== 'string' || | ||
| seenCallIds.has(item.callId) | ||
| ) { | ||
| continue; | ||
| } | ||
| merged.push(item); | ||
| seenCallIds.add(item.callId); | ||
| } | ||
|
|
||
| if (merged.length > 0) { | ||
| pendingManagedConversationAbortItems.set(conversationId, merged); | ||
| } | ||
| } | ||
|
|
||
| export function clearManagedConversationSupplementalItems( | ||
| conversationId: string | undefined, | ||
| ): void { | ||
| if (!conversationId) { | ||
| return; | ||
| } | ||
| pendingManagedConversationAbortItems.delete(conversationId); | ||
| } | ||
|
|
||
| export function getManagedConversationSupplementalItems< | ||
| TContext, | ||
| TAgent extends Agent<TContext, AgentOutputType>, | ||
| >(state: RunState<TContext, TAgent>): AgentInputItem[] { | ||
| const pendingAbortItems = state._conversationId | ||
| ? pendingManagedConversationAbortItems.get(state._conversationId) ?? [] | ||
| : []; | ||
| const processedResponse = state._lastProcessedResponse; | ||
| const handoffs = processedResponse?.handoffs; | ||
| if (!handoffs || handoffs.length <= 1) { | ||
| return []; | ||
| return pendingAbortItems; | ||
| } | ||
|
|
||
| const acceptedCallId = handoffs[0]?.toolCall.callId; | ||
|
|
@@ -156,13 +204,15 @@ export function getManagedConversationSupplementalItems< | |
| item.rawItem.callId === acceptedCallId, | ||
| ); | ||
| if (!acceptedHandoffOutputStillPresent) { | ||
| return []; | ||
| return pendingAbortItems; | ||
| } | ||
|
|
||
| const cached = | ||
| managedConversationSupplementalItemsCache.get(processedResponse); | ||
| if (cached) { | ||
| return cached; | ||
| return pendingAbortItems.length > 0 | ||
| ? [...pendingAbortItems, ...cached] | ||
| : cached; | ||
| } | ||
|
|
||
| // Server-managed transcripts still contain ignored handoff calls from the last response. | ||
|
|
@@ -173,7 +223,7 @@ export function getManagedConversationSupplementalItems< | |
| getToolCallOutputItem(toolCall, IGNORED_HANDOFF_OUTPUT_MESSAGE), | ||
| ); | ||
| managedConversationSupplementalItemsCache.set(processedResponse, items); | ||
| return items; | ||
| return pendingAbortItems.length > 0 ? [...pendingAbortItems, ...items] : items; | ||
| } | ||
|
|
||
| async function runInputGuardrailsForTurn< | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clearManagedConversationSupplementalItemsis called only in the streaming path, so abort outputs queued inpendingManagedConversationAbortItemssurvive successful non-streamingconversationIdturns. BecausegetManagedConversationSupplementalItemsalways prepends those pending items, a later freshrunner.run(..., { conversationId })can resend the same syntheticfunction_call_resultagain, which can rebalance the transcript incorrectly or trigger duplicate-call errors from the provider. Please clear the queue after successful non-streaming model calls as well.Useful? React with 👍 / 👎.