Skip to content

Commit 85ef704

Browse files
committed
[Fix #1087] A2A implementation
Signed-off-by: fjtirado <ftirados@ibm.com>
1 parent 3b5f7d3 commit 85ef704

20 files changed

Lines changed: 891 additions & 6 deletions

impl/a2a/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<parent>
4+
<groupId>io.serverlessworkflow</groupId>
5+
<artifactId>serverlessworkflow-impl</artifactId>
6+
<version>8.0.0-SNAPSHOT</version>
7+
</parent>
8+
<artifactId>serverlessworkflow-impl-a2a</artifactId>
9+
<name>Serverless Workflow :: Impl :: A2A</name>
10+
<dependencies>
11+
<dependency>
12+
<groupId>io.serverlessworkflow</groupId>
13+
<artifactId>serverlessworkflow-impl-core</artifactId>
14+
</dependency>
15+
<dependency>
16+
<groupId>org.a2aproject.sdk</groupId>
17+
<artifactId>a2a-java-sdk-client</artifactId>
18+
</dependency>
19+
<dependency>
20+
<groupId>org.junit.jupiter</groupId>
21+
<artifactId>junit-jupiter-engine</artifactId>
22+
<scope>test</scope>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.junit.jupiter</groupId>
26+
<artifactId>junit-jupiter-params</artifactId>
27+
<scope>test</scope>
28+
</dependency>
29+
</dependencies>
30+
</project>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.impl.WorkflowError;
19+
import io.serverlessworkflow.impl.WorkflowException;
20+
import io.serverlessworkflow.impl.WorkflowModel;
21+
import io.serverlessworkflow.impl.WorkflowPosition;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.function.Consumer;
24+
25+
class A2AExceptionHandler implements Consumer<Throwable> {
26+
27+
private final CompletableFuture<WorkflowModel> future;
28+
private final WorkflowPosition position;
29+
30+
A2AExceptionHandler(CompletableFuture<WorkflowModel> future, WorkflowPosition position) {
31+
this.future = future;
32+
this.position = position;
33+
}
34+
35+
@Override
36+
public void accept(Throwable ex) {
37+
38+
future.completeExceptionally(
39+
new WorkflowException(
40+
A2AUtils.workflowError(position)
41+
.title(ex.getMessage())
42+
.details(WorkflowError.getStackTrace(ex))
43+
.build(),
44+
ex));
45+
}
46+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.impl.TaskContext;
19+
import io.serverlessworkflow.impl.WorkflowContext;
20+
import io.serverlessworkflow.impl.WorkflowModel;
21+
import io.serverlessworkflow.impl.WorkflowValueResolver;
22+
import io.serverlessworkflow.impl.executors.CallableTask;
23+
import java.net.URI;
24+
import java.util.Map;
25+
import java.util.concurrent.CompletableFuture;
26+
import org.a2aproject.sdk.client.Client;
27+
import org.a2aproject.sdk.client.config.ClientConfig;
28+
import org.a2aproject.sdk.client.http.A2ACardResolver;
29+
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport;
30+
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfig;
31+
32+
class A2AExecutor implements CallableTask {
33+
34+
private final WorkflowValueResolver<URI> uriSupplier;
35+
private final A2ARequestDispatcher dispatcher;
36+
private final WorkflowValueResolver<Map<String, Object>> mapResolver;
37+
38+
public A2AExecutor(
39+
WorkflowValueResolver<URI> uriSupplier,
40+
A2ARequestDispatcher dispatcher,
41+
WorkflowValueResolver<Map<String, Object>> mapResolver) {
42+
this.uriSupplier = uriSupplier;
43+
this.dispatcher = dispatcher;
44+
this.mapResolver = mapResolver;
45+
}
46+
47+
@Override
48+
public CompletableFuture<WorkflowModel> apply(
49+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
50+
URI uri = uriSupplier.apply(workflowContext, taskContext, input);
51+
return dispatcher.apply(
52+
Client.builder(
53+
A2ACardResolver.builder()
54+
.baseUrl(uri.resolve("/").toString())
55+
.agentCardPath(uri.getPath())
56+
.build()
57+
.getAgentCard())
58+
.clientConfig(new ClientConfig.Builder().build())
59+
.withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig())
60+
.build(),
61+
mapResolver.apply(workflowContext, taskContext, input),
62+
workflowContext,
63+
taskContext);
64+
}
65+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.api.types.A2AArguments;
19+
import io.serverlessworkflow.api.types.CallA2A;
20+
import io.serverlessworkflow.api.types.TaskBase;
21+
import io.serverlessworkflow.impl.TaskContext;
22+
import io.serverlessworkflow.impl.WorkflowContext;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
24+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
25+
import io.serverlessworkflow.impl.WorkflowUtils;
26+
import io.serverlessworkflow.impl.WorkflowValueResolver;
27+
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
28+
import io.serverlessworkflow.impl.executors.CallableTaskFactory;
29+
import java.net.URI;
30+
31+
public class A2AExecutorBuilder implements CallableTaskBuilder<CallA2A> {
32+
33+
@Override
34+
public boolean accept(Class<? extends TaskBase> clazz) {
35+
return CallA2A.class.equals(clazz);
36+
}
37+
38+
@Override
39+
public CallableTaskFactory init(
40+
CallA2A task, WorkflowDefinition definition, WorkflowMutablePosition position) {
41+
A2AArguments args = task.getWith();
42+
43+
WorkflowValueResolver<URI> uriSupplier;
44+
if (args.getServer() != null) {
45+
uriSupplier = definition.resourceLoader().uriSupplier(args.getServer());
46+
} else if (args.getAgentCard() != null) {
47+
uriSupplier = definition.resourceLoader().uriSupplier(args.getAgentCard().getEndpoint());
48+
} else {
49+
throw new IllegalArgumentException("Neither server or agent card is set for task" + task);
50+
}
51+
52+
A2ARequestDispatcher dispatcher =
53+
switch (args.getMethod()) {
54+
case MESSAGE_SEND ->
55+
new MessageDispatcher(
56+
new MessageConsumerFactory() {
57+
@Override
58+
protected MessageConsumer buildConsumer(
59+
WorkflowContext workflowContext, TaskContext taskContext) {
60+
return new MessageSendConsumer(workflowContext.definition());
61+
}
62+
});
63+
case MESSAGE_STREAM ->
64+
new MessageDispatcher(
65+
new MessageConsumerFactory() {
66+
67+
@Override
68+
protected MessageConsumer buildConsumer(
69+
WorkflowContext workflowContext, TaskContext taskContext) {
70+
return new MessageStreamConsumer(
71+
workflowContext.definition(), taskContext.position());
72+
}
73+
});
74+
case TASKS_LIST -> new ListTaskParamsDispatcher();
75+
case TASKS_GET -> new GetTaskParamsDispatcher();
76+
case TASKS_CANCEL -> new CancelTaskParamsDispatcher();
77+
// TODO handle missing cases
78+
case AGENT_GET_AUTHENTICATED_EXTENDED_CARD,
79+
TASKS_PUSH_NOTIFICATION_CONFIG_DELETE,
80+
TASKS_PUSH_NOTIFICATION_CONFIG_GET,
81+
TASKS_PUSH_NOTIFICATION_CONFIG_LIST,
82+
TASKS_PUSH_NOTIFICATION_CONFIG_SET,
83+
TASKS_RESUBSCRIBE ->
84+
throw new UnsupportedOperationException("Unimplemented case: " + args.getMethod());
85+
};
86+
87+
return () ->
88+
new A2AExecutor(
89+
uriSupplier,
90+
dispatcher,
91+
WorkflowUtils.buildMapResolver(
92+
definition.application(),
93+
args.getParameters().getString(),
94+
args.getParameters().getWithA2AParameters().getAdditionalProperties()));
95+
}
96+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.a2a;
17+
18+
import io.serverlessworkflow.impl.TaskContext;
19+
import io.serverlessworkflow.impl.WorkflowContext;
20+
import io.serverlessworkflow.impl.WorkflowModel;
21+
import java.util.Map;
22+
import java.util.concurrent.CompletableFuture;
23+
import org.a2aproject.sdk.client.Client;
24+
25+
@FunctionalInterface
26+
interface A2ARequestDispatcher {
27+
CompletableFuture<WorkflowModel> apply(
28+
Client client,
29+
Map<String, Object> parameters,
30+
WorkflowContext workflowContext,
31+
TaskContext taskCtontext);
32+
}

0 commit comments

Comments
 (0)