Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion src/openai/lib/streaming/_assistants.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,10 +977,37 @@ def accumulate_event(
return current_message_snapshot, new_content


def _normalize_indexed_list(items: list[object]) -> list[object]:
"""Merge list entries that share the same `index` key.

Some providers send multiple delta entries with the same index in a single
chunk (e.g. first tool_call chunk contains both the id/name AND the start
of arguments, both at index 0). Without merging, the second entry is
stranded and never accumulated into.
"""
by_index: dict[int, dict[object, object]] = {}
order: list[int] = []
for item in items:
if not is_dict(item):
return items # non-dict list → nothing to normalise
idx = item.get("index") # type: ignore[union-attr]
if not isinstance(idx, int):
return items # no integer index → nothing to normalise
if idx not in by_index:
by_index[idx] = item # type: ignore[assignment]
order.append(idx)
else:
by_index[idx] = accumulate_delta(by_index[idx], item) # type: ignore[arg-type]
return [by_index[i] for i in order] # type: ignore[misc]


def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]:
for key, delta_value in delta.items():
if key not in acc:
acc[key] = delta_value
if is_list(delta_value) and delta_value:
acc[key] = _normalize_indexed_list(delta_value)
else:
acc[key] = delta_value
continue

acc_value = acc[key]
Expand Down
29 changes: 28 additions & 1 deletion src/openai/lib/streaming/_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,37 @@
from ..._utils import is_dict, is_list


def _normalize_indexed_list(items: list[object]) -> list[object]:
"""Merge list entries that share the same `index` key.

Some providers send multiple delta entries with the same index in a single
chunk (e.g. first tool_call chunk contains both the id/name AND the start
of arguments, both at index 0). Without merging, the second entry is
stranded and never accumulated into.
"""
by_index: dict[int, dict[object, object]] = {}
order: list[int] = []
for item in items:
if not is_dict(item):
return items # non-dict list → nothing to normalise
idx = item.get("index") # type: ignore[union-attr]
if not isinstance(idx, int):
return items # no integer index → nothing to normalise
if idx not in by_index:
by_index[idx] = item # type: ignore[assignment]
order.append(idx)
else:
by_index[idx] = accumulate_delta(by_index[idx], item) # type: ignore[arg-type]
return [by_index[i] for i in order] # type: ignore[misc]


def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]:
for key, delta_value in delta.items():
if key not in acc:
acc[key] = delta_value
if is_list(delta_value) and delta_value:
acc[key] = _normalize_indexed_list(delta_value)
Comment on lines +33 to +34

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Normalize duplicate tool calls in the initial chunk

When the duplicate tool_calls entries arrive in the very first chat streaming chunk, this branch is never reached: ChatCompletionStreamState._accumulate_chunk() returns _convert_initial_chunk_into_snapshot() while the snapshot is None, and that path copies choice.delta.to_dict() directly. In that provider scenario, the first chunk still stores two entries with index == 0, so the next chunk merges into tool_calls[0] and leaves the argument prefix stranded in tool_calls[1]; apply the same normalization in the initial conversion path or route the first chunk through the accumulator.

Useful? React with 👍 / 👎.

else:
acc[key] = delta_value
continue

acc_value = acc[key]
Expand Down
135 changes: 135 additions & 0 deletions tests/test_accumulate_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Tests for accumulate_delta handling of duplicate-index entries.

Issue #3201: when the first streamed chunk contains multiple tool_calls entries
with the same `index`, the accumulator stores them as separate list entries.
Subsequent chunks only merge into the first one, leaving the second stranded.
"""

from __future__ import annotations

from openai.lib.streaming._deltas import accumulate_delta


def test_single_index_in_first_chunk():
"""Normal case: one entry per index in the first chunk."""
acc: dict[object, object] = {}
# First chunk: one tool_call at index 0
acc = accumulate_delta(acc, {
"tool_calls": [
{"index": 0, "id": "call_1", "function": {"name": "list_files"}, "type": "function"},
]
})
# Second chunk: arguments for index 0
acc = accumulate_delta(acc, {
"tool_calls": [
{"index": 0, "function": {"arguments": ' {"path": "."}'}},
]
})

tool_calls = acc["tool_calls"]
assert isinstance(tool_calls, list)
assert len(tool_calls) == 1
assert tool_calls[0]["id"] == "call_1"
assert tool_calls[0]["function"]["name"] == "list_files"
assert tool_calls[0]["function"]["arguments"] == ' {"path": "."}'


def test_duplicate_index_in_first_chunk():
"""Issue #3201: first chunk has two entries with the same index."""
acc: dict[object, object] = {}
# First chunk: two entries both at index 0 (id+name, then start of arguments)
acc = accumulate_delta(acc, {
"tool_calls": [
{"index": 0, "id": "call_1", "function": {"name": "list_files"}, "type": "function"},
{"index": 0, "function": {"arguments": ' {"'}},
]
})
# Second chunk: rest of arguments for index 0
acc = accumulate_delta(acc, {
"tool_calls": [
{"index": 0, "function": {"arguments": 'path": "."}'}},
]
})

tool_calls = acc["tool_calls"]
assert isinstance(tool_calls, list)
assert len(tool_calls) == 1, f"Expected 1 tool_call, got {len(tool_calls)}: {tool_calls}"
assert tool_calls[0]["id"] == "call_1"
assert tool_calls[0]["function"]["name"] == "list_files"
assert tool_calls[0]["function"]["arguments"] == ' {"path": "."}'


def test_duplicate_index_preserves_type_and_id():
"""Ensure merged entry keeps id and type from the first occurrence."""
acc: dict[object, object] = {}
acc = accumulate_delta(acc, {
"tool_calls": [
{"index": 0, "id": "call_abc", "type": "function", "function": {"name": "search"}},
{"index": 0, "function": {"arguments": '{"q":'}},
]
})

tool_calls = acc["tool_calls"]
assert len(tool_calls) == 1
assert tool_calls[0]["id"] == "call_abc"
assert tool_calls[0]["type"] == "function"
assert tool_calls[0]["function"]["name"] == "search"
assert tool_calls[0]["function"]["arguments"] == '{"q":'


def test_multiple_distinct_indices_in_first_chunk():
"""Multiple entries with different indices should stay separate."""
acc: dict[object, object] = {}
acc = accumulate_delta(acc, {
"tool_calls": [
{"index": 0, "id": "call_1", "function": {"name": "fn_a"}, "type": "function"},
{"index": 1, "id": "call_2", "function": {"name": "fn_b"}, "type": "function"},
]
})

tool_calls = acc["tool_calls"]
assert isinstance(tool_calls, list)
assert len(tool_calls) == 2
assert tool_calls[0]["id"] == "call_1"
assert tool_calls[1]["id"] == "call_2"


def test_multiple_indices_with_duplicates_in_first_chunk():
"""Mix of duplicate and distinct indices."""
acc: dict[object, object] = {}
acc = accumulate_delta(acc, {
"tool_calls": [
{"index": 0, "id": "call_1", "function": {"name": "fn_a"}, "type": "function"},
{"index": 0, "function": {"arguments": '{"a":'}},
{"index": 1, "id": "call_2", "function": {"name": "fn_b"}, "type": "function"},
{"index": 1, "function": {"arguments": '{"b":'}},
]
})
# More args for both
acc = accumulate_delta(acc, {
"tool_calls": [
{"index": 0, "function": {"arguments": '"x"}'}},
{"index": 1, "function": {"arguments": '"y"}'}},
]
})

tool_calls = acc["tool_calls"]
assert isinstance(tool_calls, list)
assert len(tool_calls) == 2
assert tool_calls[0]["id"] == "call_1"
assert tool_calls[0]["function"]["arguments"] == '{"a":"x"}'
assert tool_calls[1]["id"] == "call_2"
assert tool_calls[1]["function"]["arguments"] == '{"b":"y"}'


def test_non_indexed_lists_unchanged():
"""Lists without integer `index` fields should pass through normally."""
acc: dict[object, object] = {}
acc = accumulate_delta(acc, {
"content": ["hello", "world"]
})
acc = accumulate_delta(acc, {
"content": ["!"]
})

assert acc["content"] == ["hello", "world", "!"]