Skip to content

Commit 0cf75bb

Browse files
committed
Message and concurrent payload visitors
1 parent 4d53976 commit 0cf75bb

17 files changed

Lines changed: 2575 additions & 0 deletions

File tree

temporal-sdk/build.gradle

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,35 @@ dependencies {
6565
java21Implementation files(sourceSets.main.output.classesDirs) { builtBy compileJava }
6666
}
6767

68+
// --- Payload visitor code generation ---
69+
// A build-time generator (compiled in its own source set against the proto classes from
70+
// temporal-serviceclient) emits GeneratedPayloadVisitor.java, which knows how to walk every
71+
// payload-bearing Temporal API message. The generated source is added to the main source set.
72+
sourceSets {
73+
payloadVisitorGenerator {
74+
java {
75+
srcDirs = ['src/payloadVisitorGenerator/java']
76+
}
77+
}
78+
}
79+
80+
dependencies {
81+
payloadVisitorGeneratorImplementation project(':temporal-serviceclient')
82+
}
83+
84+
def generatedPayloadVisitorDir = layout.buildDirectory.dir('generated/payloadvisitor/java')
85+
86+
def generatePayloadVisitor = tasks.register('generatePayloadVisitor', JavaExec) {
87+
dependsOn 'compilePayloadVisitorGeneratorJava'
88+
classpath = sourceSets.payloadVisitorGenerator.runtimeClasspath
89+
mainClass = 'io.temporal.internal.payload.visitor.gen.PayloadVisitorGenerator'
90+
args generatedPayloadVisitorDir.get().asFile.absolutePath
91+
inputs.files(sourceSets.payloadVisitorGenerator.runtimeClasspath)
92+
outputs.dir(generatedPayloadVisitorDir)
93+
}
94+
95+
sourceSets.main.java.srcDir(generatePayloadVisitor)
96+
6897
tasks.named('compileJava17Java') {
6998
options.release = 17
7099
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.temporal.internal.payload.visitor;
2+
3+
import com.google.protobuf.Message;
4+
5+
/**
6+
* Generated traversal for one message type: visits the message's payload fields and recurses into
7+
* its child messages. There is one per message type that can contain a payload.
8+
*/
9+
interface GeneratedVisitor {
10+
void visit(Traversal traversal, Message.Builder builder);
11+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.temporal.internal.payload.visitor;
2+
3+
import com.google.protobuf.Message;
4+
import java.util.function.Supplier;
5+
6+
/**
7+
* How to traverse one message type, and how to create an empty builder for it (used to unpack
8+
* {@code google.protobuf.Any} values).
9+
*/
10+
final class MessageRegistryEntry {
11+
final GeneratedVisitor visitor;
12+
final Supplier<Message.Builder> newBuilder;
13+
14+
MessageRegistryEntry(GeneratedVisitor visitor, Supplier<Message.Builder> newBuilder) {
15+
this.visitor = visitor;
16+
this.newBuilder = newBuilder;
17+
}
18+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.temporal.internal.payload.visitor;
2+
3+
import com.google.protobuf.MessageOrBuilder;
4+
5+
/**
6+
* Callback invoked when traversal enters a proto message. The returned value becomes the contextual
7+
* value in scope for that message and everything within it, and is restored to the enclosing value
8+
* once traversal leaves the message. The message is provided as a builder and may be inspected or
9+
* mutated.
10+
*
11+
* @param <C> type of the contextual value
12+
*/
13+
@FunctionalInterface
14+
interface MessageVisitor<C> {
15+
/**
16+
* Handles a message being entered and returns the contextual value for it and its contents.
17+
*
18+
* @param current the contextual value in scope from the enclosing message
19+
* @param message the message being entered
20+
* @return the contextual value to use for this message and its contents
21+
*/
22+
C onEnter(C current, MessageOrBuilder message);
23+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package io.temporal.internal.payload.visitor;
2+
3+
import javax.annotation.Nonnull;
4+
import javax.annotation.Nullable;
5+
6+
/**
7+
* Options for visiting the messages of a proto message, without visiting individual payloads.
8+
*
9+
* @param <C> type of the contextual value supplied to the visitor
10+
*/
11+
final class MessageVisitorOptions<C> {
12+
private final @Nonnull MessageVisitor<C> messageVisitor;
13+
private final @Nullable C initialContext;
14+
15+
private MessageVisitorOptions(Builder<C> b) {
16+
this.messageVisitor = b.messageVisitor;
17+
this.initialContext = b.initialContext;
18+
}
19+
20+
public static <C> Builder<C> newBuilder() {
21+
return new Builder<>();
22+
}
23+
24+
@Nonnull
25+
public MessageVisitor<C> getMessageVisitor() {
26+
return messageVisitor;
27+
}
28+
29+
@Nullable
30+
public C getInitialContext() {
31+
return initialContext;
32+
}
33+
34+
public static final class Builder<C> {
35+
private MessageVisitor<C> messageVisitor;
36+
private C initialContext;
37+
38+
private Builder() {}
39+
40+
/** Required. The message visitor. */
41+
public Builder<C> setMessageVisitor(@Nonnull MessageVisitor<C> messageVisitor) {
42+
this.messageVisitor = messageVisitor;
43+
return this;
44+
}
45+
46+
/** Optional. The contextual value in scope before any message is entered. */
47+
public Builder<C> setInitialContext(@Nullable C initialContext) {
48+
this.initialContext = initialContext;
49+
return this;
50+
}
51+
52+
public MessageVisitorOptions<C> build() {
53+
if (messageVisitor == null) {
54+
throw new IllegalArgumentException("messageVisitor is required");
55+
}
56+
return new MessageVisitorOptions<>(this);
57+
}
58+
}
59+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.temporal.internal.payload.visitor;
2+
3+
import com.google.protobuf.Message;
4+
import javax.annotation.Nonnull;
5+
6+
/**
7+
* Visits the messages within a proto message, invoking the message visitor on each, without
8+
* visiting individual payloads. Only messages that can contain a payload are visited.
9+
*
10+
* <p>This is an SDK-internal utility; it is not part of the public API.
11+
*/
12+
final class MessageVisitors {
13+
private MessageVisitors() {}
14+
15+
/** Visits the messages in {@code builder} in place. */
16+
public static <C> void visit(
17+
@Nonnull Message.Builder builder, @Nonnull MessageVisitorOptions<C> options) {
18+
Traversal traversal =
19+
new Traversal(
20+
null,
21+
options.getMessageVisitor(),
22+
options.getInitialContext(),
23+
/* skipSearchAttributes= */ false,
24+
/* skipHeaders= */ false,
25+
1,
26+
null,
27+
GeneratedPayloadVisitor.REGISTRY);
28+
traversal.dispatch(builder);
29+
traversal.execute();
30+
}
31+
32+
/**
33+
* Visits the messages in {@code message}, returning a copy with any changes applied; the input is
34+
* unchanged.
35+
*/
36+
@SuppressWarnings("unchecked")
37+
public static <C, T extends Message> T visit(
38+
@Nonnull T message, @Nonnull MessageVisitorOptions<C> options) {
39+
Message.Builder builder = message.toBuilder();
40+
visit(builder, options);
41+
return (T) builder.build();
42+
}
43+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.temporal.internal.payload.visitor;
2+
3+
import io.temporal.api.common.v1.Payload;
4+
import java.util.List;
5+
6+
/**
7+
* Callback for a sequence of payloads found in a proto message. The returned list replaces those
8+
* payloads; return the same list to leave them unchanged.
9+
*
10+
* <p>When the visited field holds a single payload the list has one element and the visitor must
11+
* return exactly one payload. With a concurrency limit greater than one, visits may run on multiple
12+
* threads, so implementations must be thread-safe.
13+
*
14+
* @param <C> type of the contextual value supplied to each visit
15+
*/
16+
@FunctionalInterface
17+
interface PayloadVisitor<C> {
18+
/**
19+
* Visits a sequence of payloads and returns their replacements.
20+
*
21+
* @param context the location of these payloads and the contextual value in scope
22+
* @param payloads the payloads found at this location
23+
* @return the replacement payloads
24+
*/
25+
List<Payload> visit(PayloadVisitorContext<C> context, List<Payload> payloads);
26+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.temporal.internal.payload.visitor;
2+
3+
import com.google.protobuf.MessageOrBuilder;
4+
import javax.annotation.Nonnull;
5+
import javax.annotation.Nullable;
6+
7+
/**
8+
* The context for one payload visitor call: the contextual value in scope and the message that
9+
* contains the payloads being visited.
10+
*
11+
* @param <C> type of the contextual value
12+
*/
13+
final class PayloadVisitorContext<C> {
14+
private final @Nullable C context;
15+
private final @Nonnull MessageOrBuilder parent;
16+
17+
PayloadVisitorContext(@Nullable C context, @Nonnull MessageOrBuilder parent) {
18+
this.context = context;
19+
this.parent = parent;
20+
}
21+
22+
/** The contextual value in scope at this location, or {@code null} if none. */
23+
@Nullable
24+
public C getContext() {
25+
return context;
26+
}
27+
28+
/** The message that directly contains the payloads being visited. */
29+
@Nonnull
30+
public MessageOrBuilder getParent() {
31+
return parent;
32+
}
33+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package io.temporal.internal.payload.visitor;
2+
3+
import java.util.concurrent.Executor;
4+
import javax.annotation.Nonnull;
5+
import javax.annotation.Nullable;
6+
7+
/**
8+
* Options for visiting the payloads of a proto message.
9+
*
10+
* @param <C> type of the contextual value supplied to the visitor
11+
*/
12+
final class PayloadVisitorOptions<C> {
13+
private final @Nonnull PayloadVisitor<C> payloadVisitor;
14+
private final @Nullable MessageVisitor<C> messageVisitor;
15+
private final @Nullable C initialContext;
16+
private final boolean skipSearchAttributes;
17+
private final boolean skipHeaders;
18+
private final int concurrency;
19+
private final @Nullable Executor executor;
20+
21+
private PayloadVisitorOptions(Builder<C> b) {
22+
this.payloadVisitor = b.payloadVisitor;
23+
this.messageVisitor = b.messageVisitor;
24+
this.initialContext = b.initialContext;
25+
this.skipSearchAttributes = b.skipSearchAttributes;
26+
this.skipHeaders = b.skipHeaders;
27+
this.concurrency = b.concurrency;
28+
this.executor = b.executor;
29+
}
30+
31+
public static <C> Builder<C> newBuilder() {
32+
return new Builder<>();
33+
}
34+
35+
@Nonnull
36+
public PayloadVisitor<C> getPayloadVisitor() {
37+
return payloadVisitor;
38+
}
39+
40+
@Nullable
41+
public MessageVisitor<C> getMessageVisitor() {
42+
return messageVisitor;
43+
}
44+
45+
@Nullable
46+
public C getInitialContext() {
47+
return initialContext;
48+
}
49+
50+
/** Whether search attribute payloads are skipped. */
51+
public boolean isSkipSearchAttributes() {
52+
return skipSearchAttributes;
53+
}
54+
55+
/** Whether header payloads are skipped. */
56+
public boolean isSkipHeaders() {
57+
return skipHeaders;
58+
}
59+
60+
/** Maximum number of visits that may run concurrently; {@code 1} is sequential. */
61+
public int getConcurrency() {
62+
return concurrency;
63+
}
64+
65+
/** Executor for concurrent visits; {@code null} when concurrency is {@code 1}. */
66+
@Nullable
67+
public Executor getExecutor() {
68+
return executor;
69+
}
70+
71+
public static final class Builder<C> {
72+
private PayloadVisitor<C> payloadVisitor;
73+
private MessageVisitor<C> messageVisitor;
74+
private C initialContext;
75+
private boolean skipSearchAttributes;
76+
private boolean skipHeaders;
77+
private int concurrency = 1;
78+
private Executor executor;
79+
80+
private Builder() {}
81+
82+
/** Required. The payload visitor. */
83+
public Builder<C> setPayloadVisitor(@Nonnull PayloadVisitor<C> payloadVisitor) {
84+
this.payloadVisitor = payloadVisitor;
85+
return this;
86+
}
87+
88+
/** Optional. A callback invoked when entering each message. */
89+
public Builder<C> setMessageVisitor(@Nullable MessageVisitor<C> messageVisitor) {
90+
this.messageVisitor = messageVisitor;
91+
return this;
92+
}
93+
94+
/** Optional. The contextual value in scope before any message is entered. */
95+
public Builder<C> setInitialContext(@Nullable C initialContext) {
96+
this.initialContext = initialContext;
97+
return this;
98+
}
99+
100+
/** Whether to skip search attribute payloads. */
101+
public Builder<C> setSkipSearchAttributes(boolean skipSearchAttributes) {
102+
this.skipSearchAttributes = skipSearchAttributes;
103+
return this;
104+
}
105+
106+
/** Whether to skip header payloads. */
107+
public Builder<C> setSkipHeaders(boolean skipHeaders) {
108+
this.skipHeaders = skipHeaders;
109+
return this;
110+
}
111+
112+
/**
113+
* Maximum number of concurrent visits; must be at least {@code 1} (the default, sequential). A
114+
* value greater than {@code 1} requires an executor (see {@link #setExecutor}).
115+
*/
116+
public Builder<C> setConcurrency(int concurrency) {
117+
this.concurrency = concurrency;
118+
return this;
119+
}
120+
121+
/** Executor for concurrent visits. Required when concurrency is greater than {@code 1}. */
122+
public Builder<C> setExecutor(@Nullable Executor executor) {
123+
this.executor = executor;
124+
return this;
125+
}
126+
127+
public PayloadVisitorOptions<C> build() {
128+
if (payloadVisitor == null) {
129+
throw new IllegalArgumentException("payloadVisitor is required");
130+
}
131+
if (concurrency < 1) {
132+
throw new IllegalArgumentException("concurrency must be at least 1, got " + concurrency);
133+
}
134+
if (concurrency > 1 && executor == null) {
135+
throw new IllegalArgumentException(
136+
"executor is required when concurrency is greater than 1");
137+
}
138+
return new PayloadVisitorOptions<>(this);
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)