Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/openai/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ def __stream__(self) -> Iterator[_T]:
response = self.response
process_data = self._client._process_response_data
iterator = self._iter_events()
done_seen = False

try:
for sse in iterator:
if sse.data.startswith("[DONE]"):
done_seen = True
break

# we have to special case the Assistants `thread.` events since we won't have an "event" key in the data
Expand Down Expand Up @@ -106,7 +108,19 @@ def __stream__(self) -> Iterator[_T]:
response=response,
)
finally:
# Ensure the response is closed even if the consumer doesn't read all data
# Drain remaining events from the iterator so that the underlying
# response.iter_bytes() is fully consumed, including the HTTP/1.1
# chunked transfer-encoding terminator (0\r\n\r\n). Without this,
# h11's their_state won't advance to DONE, causing httpcore to
# destroy the connection (TCP FIN) instead of returning it to the pool.
# See: https://github.com/openai/openai-python/issues/3440
#
# Only drain when [DONE] was seen. On error paths (APIError,
# parse failures) we close immediately to avoid hanging on a
# stalled stream.
if done_seen:
for _ in iterator:
pass
response.close()

def __enter__(self) -> Self:
Expand Down Expand Up @@ -167,10 +181,12 @@ async def __stream__(self) -> AsyncIterator[_T]:
response = self.response
process_data = self._client._process_response_data
iterator = self._iter_events()
done_seen = False

try:
async for sse in iterator:
if sse.data.startswith("[DONE]"):
done_seen = True
break

# we have to special case the Assistants `thread.` events since we won't have an "event" key in the data
Expand Down Expand Up @@ -216,7 +232,15 @@ async def __stream__(self) -> AsyncIterator[_T]:
response=response,
)
finally:
# Ensure the response is closed even if the consumer doesn't read all data
# Drain remaining events so the chunked terminator is consumed before close.
# See: https://github.com/openai/openai-python/issues/3440
#
# Only drain when [DONE] was seen. On error paths (APIError,
# parse failures) we close immediately to avoid hanging on a
# stalled stream.
if done_seen:
async for _ in iterator:
pass
await response.aclose()

async def __aenter__(self) -> Self:
Expand Down
2 changes: 1 addition & 1 deletion src/openai/lib/_parsing/_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def parse_response(
) -> ParsedResponse[TextFormatT]:
output_list: List[ParsedResponseOutputItem[TextFormatT]] = []

for output in response.output:
for output in (response.output or []):
if output.type == "message":
content_list: List[ParsedContent[TextFormatT]] = []
for item in output.content:
Expand Down
87 changes: 87 additions & 0 deletions tests/lib/test_parsing_responses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Tests for parse_response handling of null/None output fields."""

from __future__ import annotations

from openai._models import construct_type_unchecked
from openai._types import Omit
from openai.lib._parsing._responses import parse_response
from openai.types.responses import Response, ParsedResponse


def _make_response(output=None, **kwargs):
"""Helper to construct a Response with a given output field."""
base = {
"id": "resp_test123",
"created_at": 1234567890.0,
"model": "gpt-4o",
"object": "response",
"status": "completed",
"output": output,
"parallel_tool_calls": True,
"tool_choice": "auto",
"tools": [],
"temperature": 1.0,
"top_p": 1.0,
}
base.update(kwargs)
return construct_type_unchecked(type_=Response, value=base)


def test_parse_response_with_none_output():
"""Test that parse_response handles null output without crashing."""
response = _make_response(output=None)
assert response.output is None

result = parse_response(
text_format=None,
input_tools=None,
response=response,
)

assert isinstance(result, ParsedResponse)
assert result.output == []


def test_parse_response_with_empty_list_output():
"""Test that parse_response handles empty list output correctly."""
response = _make_response(output=[])
assert response.output == []

result = parse_response(
text_format=None,
input_tools=None,
response=response,
)

assert isinstance(result, ParsedResponse)
assert result.output == []


def test_parse_response_with_message_output():
"""Test that parse_response still works correctly with actual output items."""
output_data = [
{
"id": "msg_test123",
"type": "message",
"status": "completed",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Hello, world!",
"annotations": [],
}
],
}
]
response = _make_response(output=output_data)

result = parse_response(
text_format=Omit(),
input_tools=None,
response=response,
)

assert isinstance(result, ParsedResponse)
assert len(result.output) == 1
assert result.output[0].type == "message"
69 changes: 69 additions & 0 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,72 @@ def make_event_iterator(
return AsyncStream(
cast_to=object, client=async_client, response=httpx.Response(200, content=to_aiter(content))
)._iter_events()


@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_stream_drains_before_close(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None:
"""Regression test for https://github.com/openai/openai-python/issues/3440

After [DONE], the response stream should be fully drained (including the
chunked transfer-encoding terminator) before close() is called.
"""
chunks = [
b'data: {"choices":[{"delta":{"content":"hi"}}]}\n\n',
b'data: [DONE]\n\n',
]

if sync:
response = httpx.Response(200, content=iter(chunks))
stream = Stream(cast_to=object, client=client, response=response)
for _ in stream:
pass
assert response.is_stream_consumed
else:
response = httpx.Response(200, content=to_aiter(iter(chunks)))
stream = AsyncStream(cast_to=object, client=async_client, response=response)
async for _ in stream:
pass
assert response.is_stream_consumed


@pytest.mark.asyncio
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
async def test_stream_no_drain_on_error(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None:
"""When an error occurs before [DONE], the drain loop should be skipped.

This prevents hanging on stalled streams during error paths.
"""
stall_reached = False
dummy_request = httpx.Request("POST", "https://example.com/v1/chat/completions")

def body() -> Iterator[bytes]:
nonlocal stall_reached
yield b'data: {"error":{"message":"bad request"}}\n\n'
# If drain loop runs, it will hang here
stall_reached = True
while True:
yield b""

if sync:
response = httpx.Response(200, content=body(), request=dummy_request)
stream = Stream(cast_to=object, client=client, response=response)
with pytest.raises(Exception, match="bad request"):
for _ in stream:
pass
assert not stall_reached
else:

async def abody() -> AsyncIterator[bytes]:
nonlocal stall_reached
yield b'data: {"error":{"message":"bad request"}}\n\n'
stall_reached = True
while True:
yield b""

response = httpx.Response(200, content=abody(), request=dummy_request)
stream = AsyncStream(cast_to=object, client=async_client, response=response)
with pytest.raises(Exception, match="bad request"):
async for _ in stream:
pass
assert not stall_reached