Skip to content

Commit fd9c3b5

Browse files
feat: Update to support python 3.10 (#85)
* feat: Update to support python 3.12 - add case statement to use the asyncio.Queue.shutdown method for 3.13+ - add special handling to allow for similar semantics as asyncio.Queue.shutdown for 3.12 Tested on multiple samples in the a2a repo and some examples in this repo * Change to 3.10 and provided detailed description about event queue usage * Update unit tests to Python3.10 * Run unit tests with both 3.10 and 3.13 * Spelling fixes * Update ruff.toml and noxfile to run on 3.10 * Update pyproject.toml with all supported versions --------- Co-authored-by: holtskinner <[email protected]>
1 parent 5b03148 commit fd9c3b5

File tree

8 files changed

+70
-14
lines changed

8 files changed

+70
-14
lines changed

.github/workflows/unit-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818

1919
strategy:
2020
matrix:
21-
python-version: ["3.13"]
21+
python-version: ["3.10", "3.13"]
2222

2323
steps:
2424
- name: Checkout code

.ruff.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
line-length = 80 # Google Style Guide §3.2: 80 columns
1010
indent-width = 4 # Google Style Guide §3.4: 4 spaces
1111

12-
target-version = "py313" # Minimum Python version
12+
target-version = "py310" # Minimum Python version
1313

1414
[lint]
1515
ignore = [

noxfile.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import nox
2424

2525

26-
DEFAULT_PYTHON_VERSION = '3.13'
26+
DEFAULT_PYTHON_VERSION = '3.10'
2727

2828
CURRENT_DIRECTORY = pathlib.Path(__file__).parent.absolute()
2929

@@ -127,7 +127,7 @@ def format(session):
127127
session.run(
128128
'pyupgrade',
129129
'--exit-zero-even-if-changed',
130-
'--py313-plus',
130+
'--py310-plus',
131131
*lint_paths_py,
132132
)
133133
session.run(

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ description = "A2A Python SDK"
55
readme = "README.md"
66
license = { file = "LICENSE" }
77
authors = [{ name = "Google LLC", email = "[email protected]" }]
8-
requires-python = ">=3.13"
8+
requires-python = ">=3.10"
99
keywords = ["A2A", "A2A SDK", "A2A Protocol", "Agent2Agent"]
1010
dependencies = [
1111
"httpx>=0.28.1",
@@ -22,6 +22,9 @@ classifiers = [
2222
"Intended Audience :: Developers",
2323
"Programming Language :: Python",
2424
"Programming Language :: Python :: 3",
25+
"Programming Language :: Python :: 3.10",
26+
"Programming Language :: Python :: 3.11",
27+
"Programming Language :: Python :: 3.12",
2528
"Programming Language :: Python :: 3.13",
2629
"Operating System :: OS Independent",
2730
"Topic :: Software Development :: Libraries :: Python Modules",

src/a2a/server/events/event_consumer.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import sys
34

45
from collections.abc import AsyncGenerator
56

@@ -15,6 +16,13 @@
1516
from a2a.utils.telemetry import SpanKind, trace_class
1617

1718

19+
# This is an alias to the exception for closed queue
20+
QueueClosed = asyncio.QueueEmpty
21+
22+
# When using python 3.13 or higher, the closed queue signal is QueueShutdown
23+
if sys.version_info >= (3, 13):
24+
QueueClosed = asyncio.QueueShutDown
25+
1826
logger = logging.getLogger(__name__)
1927

2028

@@ -111,13 +119,16 @@ async def consume_all(self) -> AsyncGenerator[Event]:
111119

112120
if is_final_event:
113121
logger.debug('Stopping event consumption in consume_all.')
114-
self.queue.close()
122+
await self.queue.close()
115123
break
116124
except TimeoutError:
117125
# continue polling until there is a final event
118126
continue
119-
except asyncio.QueueShutDown:
120-
break
127+
except QueueClosed:
128+
# Confirm that the queue is closed, e.g. we aren't on
129+
# python 3.12 and get a queue empty error on an open queue
130+
if self.queue.is_closed():
131+
break
121132

122133
def agent_task_callback(self, agent_task: asyncio.Task[None]):
123134
"""Callback to handle exceptions from the agent's execution task.

src/a2a/server/events/event_queue.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import sys
34

45
from a2a.types import (
56
A2AError,
@@ -39,6 +40,8 @@ def __init__(self) -> None:
3940
"""Initializes the EventQueue."""
4041
self.queue: asyncio.Queue[Event] = asyncio.Queue()
4142
self._children: list[EventQueue] = []
43+
self._is_closed = False
44+
self._lock = asyncio.Lock()
4245
logger.debug('EventQueue initialized.')
4346

4447
def enqueue_event(self, event: Event):
@@ -47,6 +50,9 @@ def enqueue_event(self, event: Event):
4750
Args:
4851
event: The event object to enqueue.
4952
"""
53+
if self._is_closed:
54+
logger.warning('Queue is closed. Event will not be enqueued.')
55+
return
5056
logger.debug(f'Enqueuing event of type: {type(event)}')
5157
self.queue.put_nowait(event)
5258
for child in self._children:
@@ -55,6 +61,20 @@ def enqueue_event(self, event: Event):
5561
async def dequeue_event(self, no_wait: bool = False) -> Event:
5662
"""Dequeues an event from the queue.
5763
64+
This implementation expects that dequeue to raise an exception when
65+
the queue has been closed. In python 3.13+ this is naturally provided
66+
by the QueueShutDown exception generated when the queue has closed and
67+
the user is awaiting the queue.get method. Python<=3.12 this needs to
68+
manage this lifecycle itself. The current implementation can lead to
69+
blocking if the dequeue_event is called before the EventQueue has been
70+
closed but when there are no events on the queue. Two ways to avoid this
71+
are to call this with no_wait = True which won't block, but is the
72+
callers responsibility to retry as appropriate. Alternatively, one can
73+
use a async Task management solution to cancel the get task if the queue
74+
has closed or some other condition is met. The implementation of the
75+
EventConsumer uses an async.wait with a timeout to abort the
76+
dequeue_event call and retry, when it will return with a closed error.
77+
5878
Args:
5979
no_wait: If True, retrieve an event immediately or raise `asyncio.QueueEmpty`.
6080
If False (default), wait until an event is available.
@@ -66,6 +86,11 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
6686
asyncio.QueueEmpty: If `no_wait` is True and the queue is empty.
6787
asyncio.QueueShutDown: If the queue has been closed and is empty.
6888
"""
89+
async with self._lock:
90+
if self._is_closed and self.queue.empty():
91+
logger.warning('Queue is closed. Event will not be dequeued.')
92+
raise asyncio.QueueEmpty('Queue is closed.')
93+
6994
if no_wait:
7095
logger.debug('Attempting to dequeue event (no_wait=True).')
7196
event = self.queue.get_nowait()
@@ -99,13 +124,30 @@ def tap(self) -> 'EventQueue':
99124
self._children.append(queue)
100125
return queue
101126

102-
def close(self):
127+
async def close(self):
103128
"""Closes the queue for future push events.
104129
105130
Once closed, `dequeue_event` will eventually raise `asyncio.QueueShutDown`
106131
when the queue is empty. Also closes all child queues.
107132
"""
108133
logger.debug('Closing EventQueue.')
109-
self.queue.shutdown()
110-
for child in self._children:
111-
child.close()
134+
async with self._lock:
135+
# If already closed, just return.
136+
if self._is_closed:
137+
return
138+
self._is_closed = True
139+
# If using python 3.13 or higher, use the shutdown method
140+
if sys.version_info >= (3, 13):
141+
self.queue.shutdown()
142+
for child in self._children:
143+
child.close()
144+
# Otherwise, join the queue
145+
else:
146+
tasks = [asyncio.create_task(self.queue.join())]
147+
for child in self._children:
148+
tasks.append(asyncio.create_task(child.close()))
149+
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
150+
151+
def is_closed(self) -> bool:
152+
"""Checks if the queue is closed."""
153+
return self._is_closed

src/a2a/server/events/in_memory_queue_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async def close(self, task_id: str):
6969
if task_id not in self._task_queue:
7070
raise NoTaskQueue()
7171
queue = self._task_queue.pop(task_id)
72-
queue.close()
72+
await queue.close()
7373

7474
async def create_or_tap(self, task_id: str) -> EventQueue:
7575
"""Creates a new event queue for a task ID if one doesn't exist, otherwise taps the existing one.

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ async def _run_event_stream(
147147
queue: The event queue for the agent to publish to.
148148
"""
149149
await self.agent_executor.execute(request, queue)
150-
queue.close()
150+
await queue.close()
151151

152152
async def on_message_send(
153153
self, params: MessageSendParams

0 commit comments

Comments
 (0)