Skip to content

Commit a981605

Browse files
feat: Add gRPC server and client support (#162)
Introduces gRPC support at the server and client interfaces. - Does not require changes to the AgentExecutors, simply a different wrapper at the binary server level - Example to follow in a2a-samples - Will introduce spec change to enable announcement of supported transport(s) by url for an agent in a following PR --------- Co-authored-by: Holt Skinner <[email protected]> Co-authored-by: Holt Skinner <[email protected]>
1 parent 51c2d8a commit a981605

File tree

19 files changed

+2606
-6
lines changed

19 files changed

+2606
-6
lines changed

.github/actions/spelling/allow.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,24 @@ ACard
22
AClient
33
AError
44
AFast
5+
AGrpc
56
ARequest
67
ARun
78
AServer
89
AServers
10+
AService
911
AStarlette
1012
EUR
1113
GBP
1214
INR
1315
JPY
1416
JSONRPCt
1517
Llm
18+
RUF
1619
aconnect
1720
adk
1821
agentic
22+
aio
1923
autouse
2024
cla
2125
cls
@@ -34,6 +38,7 @@ linting
3438
oauthoidc
3539
opensource
3640
protoc
41+
pyi
3742
pyversions
3843
socio
3944
sse

.github/actions/spelling/excludes.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@
8585
\.zip$
8686
^\.github/actions/spelling/
8787
^\.github/workflows/
88-
^\Qsrc/a2a/auth/__init__.py\E$
89-
^\Qsrc/a2a/server/request_handlers/context.py\E$
9088
CHANGELOG.md
9189
noxfile.py
90+
^src/a2a/grpc/

.github/linters/.jscpd.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"ignore": ["**/.github/**", "**/.git/**", "**/tests/**"],
2+
"ignore": ["**/.github/**", "**/.git/**", "**/tests/**", "**/src/a2a/grpc/**", "**/.nox/**", "**/.venv/**"],
33
"threshold": 3,
44
"reporters": ["html", "markdown"]
55
}

.github/linters/.ruff.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ exclude = [
8282
"venv",
8383
"*/migrations/*",
8484
"noxfile.py",
85+
"src/a2a/grpc/**",
8586
]
8687

8788
[lint.isort]
@@ -137,9 +138,14 @@ inline-quotes = "single"
137138
"SLF001",
138139
]
139140
"types.py" = ["D", "E501", "N815"] # Ignore docstring and annotation issues in types.py
141+
"proto_utils.py" = ["D102", "PLR0911"]
142+
"helpers.py" = ["ANN001", "ANN201", "ANN202"]
140143

141144
[format]
142-
exclude = ["types.py"]
145+
exclude = [
146+
"types.py",
147+
"src/a2a/grpc/**",
148+
]
143149
docstring-code-format = true
144150
docstring-code-line-length = "dynamic" # Or set to 80
145151
quote-style = "single"

.github/workflows/linter.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,5 @@ jobs:
6464
VALIDATE_GIT_COMMITLINT: false
6565
PYTHON_MYPY_CONFIG_FILE: .mypy.ini
6666
FILTER_REGEX_INCLUDE: ".*src/**/*"
67+
FILTER_REGEX_EXCLUDE: ".*src/a2a/grpc/**/*"
6768
PYTHON_RUFF_CONFIG_FILE: .ruff.toml

buf.gen.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ managed:
1919
plugins:
2020
# Generate python protobuf related code
2121
# Generates *_pb2.py files, one for each .proto
22-
- remote: buf.build/protocolbuffers/python
22+
- remote: buf.build/protocolbuffers/python:v29.3
2323
out: src/a2a/grpc
2424
# Generate python service code.
2525
# Generates *_pb2_grpc.py
2626
- remote: buf.build/grpc/python
2727
out: src/a2a/grpc
28+
# Generates *_pb2.pyi files.
29+
- remote: buf.build/protocolbuffers/pyi:v29.3
30+
out: src/a2a/grpc

noxfile.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,17 @@ def format(session) -> None:
103103
}
104104
)
105105

106-
lint_paths_py = [f for f in changed_files if f.endswith('.py')]
106+
lint_paths_py = [
107+
f for f in changed_files if f.endswith('.py') and 'grpc/' not in f
108+
]
107109

108110
if not lint_paths_py:
109111
session.log('No changed Python files to lint.')
110112
return
111113

112114
session.install(
113115
'types-requests',
116+
'types-protobuf',
114117
'pyupgrade',
115118
'autoflake',
116119
'ruff',

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@ dependencies = [
1111
"fastapi>=0.115.12",
1212
"httpx>=0.28.1",
1313
"httpx-sse>=0.4.0",
14+
"google-api-core>=1.26.0",
1415
"opentelemetry-api>=1.33.0",
1516
"opentelemetry-sdk>=1.33.0",
1617
"pydantic>=2.11.3",
1718
"sse-starlette>=2.3.3",
1819
"starlette>=0.46.2",
20+
"grpcio>=1.60",
21+
"grpcio-tools>=1.60",
22+
"grpcio_reflection>=1.7.0",
1923
]
2024

2125
classifiers = [

src/a2a/client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
A2AClientHTTPError,
77
A2AClientJSONError,
88
)
9+
from a2a.client.grpc_client import A2AGrpcClient
910
from a2a.client.helpers import create_text_message_object
1011

1112

@@ -15,5 +16,6 @@
1516
'A2AClientError',
1617
'A2AClientHTTPError',
1718
'A2AClientJSONError',
19+
'A2AGrpcClient',
1820
'create_text_message_object',
1921
]

src/a2a/client/grpc_client.py

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
import logging
2+
3+
from collections.abc import AsyncGenerator
4+
5+
import grpc
6+
7+
from a2a.grpc import a2a_pb2, a2a_pb2_grpc
8+
from a2a.types import (
9+
AgentCard,
10+
Message,
11+
MessageSendParams,
12+
Task,
13+
TaskArtifactUpdateEvent,
14+
TaskIdParams,
15+
TaskPushNotificationConfig,
16+
TaskQueryParams,
17+
TaskStatusUpdateEvent,
18+
)
19+
from a2a.utils import proto_utils
20+
from a2a.utils.telemetry import SpanKind, trace_class
21+
22+
23+
logger = logging.getLogger(__name__)
24+
25+
26+
@trace_class(kind=SpanKind.CLIENT)
27+
class A2AGrpcClient:
28+
"""A2A Client for interacting with an A2A agent via gRPC."""
29+
30+
def __init__(
31+
self,
32+
grpc_stub: a2a_pb2_grpc.A2AServiceStub,
33+
agent_card: AgentCard,
34+
):
35+
"""Initializes the A2AGrpcClient.
36+
37+
Requires an `AgentCard`
38+
39+
Args:
40+
grpc_stub: A grpc client stub.
41+
agent_card: The agent card object.
42+
"""
43+
self.agent_card = agent_card
44+
self.stub = grpc_stub
45+
46+
async def send_message(
47+
self,
48+
request: MessageSendParams,
49+
) -> Task | Message:
50+
"""Sends a non-streaming message request to the agent.
51+
52+
Args:
53+
request: The `MessageSendParams` object containing the message and configuration.
54+
55+
Returns:
56+
A `Task` or `Message` object containing the agent's response.
57+
"""
58+
response = await self.stub.SendMessage(
59+
a2a_pb2.SendMessageRequest(
60+
request=proto_utils.ToProto.message(request.message),
61+
configuration=proto_utils.ToProto.message_send_configuration(
62+
request.configuration
63+
),
64+
metadata=proto_utils.ToProto.metadata(request.metadata),
65+
)
66+
)
67+
if response.task:
68+
return proto_utils.FromProto.task(response.task)
69+
return proto_utils.FromProto.message(response.msg)
70+
71+
async def send_message_streaming(
72+
self,
73+
request: MessageSendParams,
74+
) -> AsyncGenerator[
75+
Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent
76+
]:
77+
"""Sends a streaming message request to the agent and yields responses as they arrive.
78+
79+
This method uses gRPC streams to receive a stream of updates from the
80+
agent.
81+
82+
Args:
83+
request: The `MessageSendParams` object containing the message and configuration.
84+
85+
Yields:
86+
`Message` or `Task` or `TaskStatusUpdateEvent` or
87+
`TaskArtifactUpdateEvent` objects as they are received in the
88+
stream.
89+
"""
90+
stream = self.stub.SendStreamingMessage(
91+
a2a_pb2.SendMessageRequest(
92+
request=proto_utils.ToProto.message(request.message),
93+
configuration=proto_utils.ToProto.message_send_configuration(
94+
request.configuration
95+
),
96+
metadata=proto_utils.ToProto.metadata(request.metadata),
97+
)
98+
)
99+
while True:
100+
response = await stream.read()
101+
if response == grpc.aio.EOF:
102+
break
103+
if response.HasField('msg'):
104+
yield proto_utils.FromProto.message(response.msg)
105+
elif response.HasField('task'):
106+
yield proto_utils.FromProto.task(response.task)
107+
elif response.HasField('status_update'):
108+
yield proto_utils.FromProto.task_status_update_event(
109+
response.status_update
110+
)
111+
elif response.HasField('artifact_update'):
112+
yield proto_utils.FromProto.task_artifact_update_event(
113+
response.artifact_update
114+
)
115+
116+
async def get_task(
117+
self,
118+
request: TaskQueryParams,
119+
) -> Task:
120+
"""Retrieves the current state and history of a specific task.
121+
122+
Args:
123+
request: The `TaskQueryParams` object specifying the task ID
124+
125+
Returns:
126+
A `Task` object containing the Task or None.
127+
"""
128+
task = await self.stub.GetTask(
129+
a2a_pb2.GetTaskRequest(name=f'tasks/{request.id}')
130+
)
131+
return proto_utils.FromProto.task(task)
132+
133+
async def cancel_task(
134+
self,
135+
request: TaskIdParams,
136+
) -> Task:
137+
"""Requests the agent to cancel a specific task.
138+
139+
Args:
140+
request: The `TaskIdParams` object specifying the task ID.
141+
142+
Returns:
143+
A `Task` object containing the updated Task
144+
"""
145+
task = await self.stub.CancelTask(
146+
a2a_pb2.CancelTaskRequest(name=f'tasks/{request.id}')
147+
)
148+
return proto_utils.FromProto.task(task)
149+
150+
async def set_task_callback(
151+
self,
152+
request: TaskPushNotificationConfig,
153+
) -> TaskPushNotificationConfig:
154+
"""Sets or updates the push notification configuration for a specific task.
155+
156+
Args:
157+
request: The `TaskPushNotificationConfig` object specifying the task ID and configuration.
158+
159+
Returns:
160+
A `TaskPushNotificationConfig` object containing the config.
161+
"""
162+
config = await self.stub.CreateTaskPushNotification(
163+
a2a_pb2.CreateTaskPushNotificationRequest(
164+
parent='',
165+
config_id='',
166+
config=proto_utils.ToProto.task_push_notification_config(
167+
request
168+
),
169+
)
170+
)
171+
return proto_utils.FromProto.task_push_notification_config(config)
172+
173+
async def get_task_callback(
174+
self,
175+
request: TaskIdParams, # TODO: Update to a push id params
176+
) -> TaskPushNotificationConfig:
177+
"""Retrieves the push notification configuration for a specific task.
178+
179+
Args:
180+
request: The `TaskIdParams` object specifying the task ID.
181+
182+
Returns:
183+
A `TaskPushNotificationConfig` object containing the configuration.
184+
"""
185+
config = await self.stub.GetTaskPushNotification(
186+
a2a_pb2.GetTaskPushNotificationRequest(
187+
name=f'tasks/{request.id}/pushNotification/undefined',
188+
)
189+
)
190+
return proto_utils.FromProto.task_push_notification_config(config)

src/a2a/grpc/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)