Skip to content

Commit 51c2d8a

Browse files
authored
fix: Event consumer should stop on input_required (#167)
Event consumer currently only stops properly for streaming events. But for sync calls that transition to input_required state, the consumer doesn't treat them as final events and results in queue closed exception. Example test: ```py task: Task= new_task(context.message) result = await self.agent.invoke() message = new_agent_text_message(result) task.status = TaskStatus(state=TaskState.input_required, message=message) event_queue.enqueue_event(task) ```
1 parent 2c4f1a7 commit 51c2d8a

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

src/a2a/server/events/event_consumer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
112112
TaskState.failed,
113113
TaskState.rejected,
114114
TaskState.unknown,
115+
TaskState.input_required
115116
)
116117
)
117118
)

tests/server/events/test_event_consumer.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,3 +222,22 @@ async def mock_dequeue() -> Any:
222222
assert len(consumed_events) == 1
223223
assert consumed_events[0] == events[0]
224224
assert mock_event_queue.task_done.call_count == 1
225+
226+
@pytest.mark.asyncio
227+
async def test_consume_task_input_required(
228+
event_consumer: MagicMock,
229+
mock_event_queue: MagicMock,
230+
):
231+
task = Task(**MINIMAL_TASK)
232+
task.status = TaskStatus(state=TaskState.input_required)
233+
234+
async def mock_dequeue() -> Any:
235+
return task
236+
237+
mock_event_queue.dequeue_event = mock_dequeue
238+
consumed_events: list[Any] = []
239+
#consumer should terminate on input_required task
240+
async for event in event_consumer.consume_all():
241+
consumed_events.append(event)
242+
assert len(consumed_events) == 1
243+
assert consumed_events[0] == task

0 commit comments

Comments
 (0)