Skip to content
35 changes: 35 additions & 0 deletions temporal-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,41 @@ dependencies {
java21Implementation files(sourceSets.main.output.classesDirs) { builtBy compileJava }
}

// --- Payload visitor code generation ---
// A build-time generator (compiled in its own source set against the proto classes from
// temporal-serviceclient) emits GeneratedPayloadVisitor.java, which knows how to walk every
// payload-bearing Temporal API message. The generated source is added to the main source set.
sourceSets {
payloadVisitorGenerator {
java {
srcDirs = ['src/payloadVisitorGenerator/java']
}
}
}

dependencies {
payloadVisitorGeneratorImplementation project(':temporal-serviceclient')
}

def generatedPayloadVisitorDir = layout.buildDirectory.dir('generated/payloadvisitor/java')

def generatePayloadVisitor = tasks.register('generatePayloadVisitor', JavaExec) {
dependsOn 'compilePayloadVisitorGeneratorJava'
classpath = sourceSets.payloadVisitorGenerator.runtimeClasspath
mainClass = 'io.temporal.internal.payload.visitor.gen.PayloadVisitorGenerator'
args generatedPayloadVisitorDir.get().asFile.absolutePath
inputs.files(sourceSets.payloadVisitorGenerator.runtimeClasspath)
outputs.dir(generatedPayloadVisitorDir)
}

sourceSets.main.java.srcDir(generatePayloadVisitor)

tasks.named('compilePayloadVisitorGeneratorJava') {
options.encoding = 'UTF-8'
options.compilerArgs << '-Xlint:none' << '-Xlint:deprecation' << '-Werror'
options.errorprone.error('MissingCasesInEnumSwitch')
}

tasks.named('compileJava17Java') {
options.release = 17
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.temporal.internal.common;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;

/**
* A simple async semaphore. Unfortunately there's not any readily available properly licensed
* library I could find for this which is a bit shocking, but this implementation should be suitable
* for our needs.
*/
public final class AsyncSemaphore {
private final ReentrantLock lock = new ReentrantLock();
private final Queue<CompletableFuture<Void>> waiters = new ArrayDeque<>();
private int permits;

public AsyncSemaphore(int initialPermits) {
this.permits = initialPermits;
}

/**
* Acquire a permit asynchronously. If a permit is available, returns a completed future,
* otherwise returns a future that will be completed when a permit is released.
*/
public CompletableFuture<Void> acquire() {
lock.lock();
try {
if (permits > 0) {
permits--;
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> waiter = new CompletableFuture<>();
waiters.add(waiter);
return waiter;
}
} finally {
lock.unlock();
}
}

public boolean tryAcquire() {
lock.lock();
try {
if (permits > 0) {
permits--;
return true;
}
return false;
} finally {
lock.unlock();
}
}

/**
* Release a permit. If there are waiting futures, completes the next one instead of incrementing
* the permit count.
*/
public void release() {
lock.lock();
try {
CompletableFuture<Void> waiter = waiters.poll();
if (waiter != null) {
if (!waiter.complete(null) && waiter.isCancelled()) {
// If this waiter was cancelled, we need to release another permit, since this waiter
// is now useless
release();
}
} else {
permits++;
}
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.temporal.internal.payload.visitor;

import com.google.protobuf.Message;

/**
* Generated traversal for one message type: visits the message's payload fields and recurses into
* its child messages. There is one per message type that can contain a payload.
*/
interface GeneratedVisitor {
void visit(Traversal traversal, Message.Builder builder);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.temporal.internal.payload.visitor;

import com.google.protobuf.Message;
import java.util.function.Supplier;

/**
* How to traverse one message type, and how to create an empty builder for it (used to unpack
* {@code google.protobuf.Any} values).
*/
final class MessageRegistryEntry {
final GeneratedVisitor visitor;
final Supplier<Message.Builder> newBuilder;

MessageRegistryEntry(GeneratedVisitor visitor, Supplier<Message.Builder> newBuilder) {
this.visitor = visitor;
this.newBuilder = newBuilder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.temporal.internal.payload.visitor;

import com.google.protobuf.MessageOrBuilder;

/**
* Callback invoked when traversal enters a proto message. The returned value becomes the contextual
* value in scope for that message and everything within it, and is restored to the enclosing value
* once traversal leaves the message. The message is provided as a builder and may be inspected or
* mutated.
Comment thread
jmaeagle99 marked this conversation as resolved.
*
* @param <C> type of the contextual value
*/
@FunctionalInterface
interface MessageVisitor<C> {
/**
* Handles a message being entered and returns the contextual value for it and its contents.
*
* @param current the contextual value in scope from the enclosing message
* @param message the message being entered
* @return the contextual value to use for this message and its contents
*/
C onEnter(C current, MessageOrBuilder message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.temporal.internal.payload.visitor;

import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Options for visiting the messages of a proto message, without visiting individual payloads.
*
* @param <C> type of the contextual value supplied to the visitor
*/
final class MessageVisitorOptions<C> {
private final @Nonnull MessageVisitor<C> messageVisitor;
private final @Nullable C initialContext;

private MessageVisitorOptions(Builder<C> b) {
this.messageVisitor = b.messageVisitor;
this.initialContext = b.initialContext;
}

public static <C> Builder<C> newBuilder(@Nonnull MessageVisitor<C> messageVisitor) {
return new Builder<>(messageVisitor);
}

@Nonnull
public MessageVisitor<C> getMessageVisitor() {
return messageVisitor;
}

@Nullable
public C getInitialContext() {
return initialContext;
}

public static final class Builder<C> {
private final @Nonnull MessageVisitor<C> messageVisitor;
private C initialContext;

private Builder(@Nonnull MessageVisitor<C> messageVisitor) {
this.messageVisitor = Objects.requireNonNull(messageVisitor, "messageVisitor");
}

/** The contextual value in scope before any message is entered. */
public Builder<C> setInitialContext(@Nullable C initialContext) {
this.initialContext = initialContext;
return this;
}

public MessageVisitorOptions<C> build() {
return new MessageVisitorOptions<>(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.temporal.internal.payload.visitor;

import com.google.protobuf.Message;
import javax.annotation.Nonnull;

/**
* Visits the messages within a proto message, invoking the message visitor on each, without
* visiting individual payloads. Only messages that can contain a payload are visited.
*
* <p>This is an SDK-internal utility; it is not part of the public API.
*/
final class MessageVisitors {
private MessageVisitors() {}

/** Visits the messages in {@code builder} in place. */
public static <C> void visit(
@Nonnull Message.Builder builder, @Nonnull MessageVisitorOptions<C> options) {
Traversal traversal =
new Traversal(
null,
options.getMessageVisitor(),
options.getInitialContext(),
/* skipSearchAttributes= */ false,
/* skipHeaders= */ false,
1,
GeneratedPayloadVisitor.REGISTRY);
traversal.dispatch(builder);
// No payload visits, so execute() completes inline; join() returns at once. Message-visitor
// errors throw from dispatch above.
traversal.execute().join();
}

/** Returns a copy with any changes applied; {@code message} is unchanged. */
@SuppressWarnings("unchecked")
public static <C, T extends Message> T visit(
@Nonnull T message, @Nonnull MessageVisitorOptions<C> options) {
Message.Builder builder = message.toBuilder();
visit(builder, options);
return (T) builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.temporal.internal.payload.visitor;

import io.temporal.api.common.v1.Payload;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Callback completing with the list that replaces {@code payloads}; complete with the same list to
* leave them unchanged. Asynchronous so I/O-backed implementations (e.g. external storage) compose
* without blocking a thread per call; a synchronous one returns {@link
* CompletableFuture#completedFuture}.
*
* <p>For a single-payload field the visitor must complete with exactly one payload. With
* concurrency greater than one, several visits may be in flight at once, so implementations must be
* thread-safe.
*
* @param <C> type of the contextual value supplied to each visit
*/
@FunctionalInterface
interface PayloadVisitor<C> {
CompletableFuture<List<Payload>> visit(C context, List<Payload> payloads);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package io.temporal.internal.payload.visitor;

import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Options for visiting the payloads of a proto message.
*
* @param <C> type of the contextual value supplied to the visitor
*/
final class PayloadVisitorOptions<C> {
private final @Nonnull PayloadVisitor<C> payloadVisitor;
private final @Nullable MessageVisitor<C> messageVisitor;
private final @Nullable C initialContext;
private final boolean skipSearchAttributes;
private final boolean skipHeaders;
private final int concurrency;

private PayloadVisitorOptions(Builder<C> b) {
this.payloadVisitor = b.payloadVisitor;
this.messageVisitor = b.messageVisitor;
this.initialContext = b.initialContext;
this.skipSearchAttributes = b.skipSearchAttributes;
this.skipHeaders = b.skipHeaders;
this.concurrency = b.concurrency;
}

public static <C> Builder<C> newBuilder(@Nonnull PayloadVisitor<C> payloadVisitor) {
return new Builder<>(payloadVisitor);
}

@Nonnull
public PayloadVisitor<C> getPayloadVisitor() {
return payloadVisitor;
}

@Nullable
public MessageVisitor<C> getMessageVisitor() {
return messageVisitor;
}

@Nullable
public C getInitialContext() {
return initialContext;
}

public boolean isSkipSearchAttributes() {
return skipSearchAttributes;
}

public boolean isSkipHeaders() {
return skipHeaders;
}

public int getConcurrency() {
return concurrency;
}

public static final class Builder<C> {
private final @Nonnull PayloadVisitor<C> payloadVisitor;
private MessageVisitor<C> messageVisitor;
private C initialContext;
private boolean skipSearchAttributes;
private boolean skipHeaders;
private int concurrency = 1;

private Builder(@Nonnull PayloadVisitor<C> payloadVisitor) {
this.payloadVisitor = Objects.requireNonNull(payloadVisitor, "payloadVisitor");
}

public Builder<C> setMessageVisitor(@Nullable MessageVisitor<C> messageVisitor) {
this.messageVisitor = messageVisitor;
return this;
}

/** The contextual value in scope before any message is entered. */
public Builder<C> setInitialContext(@Nullable C initialContext) {
this.initialContext = initialContext;
return this;
}

public Builder<C> setSkipSearchAttributes(boolean skipSearchAttributes) {
this.skipSearchAttributes = skipSearchAttributes;
return this;
}

public Builder<C> setSkipHeaders(boolean skipHeaders) {
this.skipHeaders = skipHeaders;
return this;
}

/** At least {@code 1} (sequential). Bounds outstanding visit futures; no executor needed. */
public Builder<C> setConcurrency(int concurrency) {
this.concurrency = concurrency;
return this;
}

public PayloadVisitorOptions<C> build() {
if (concurrency < 1) {
throw new IllegalArgumentException("concurrency must be at least 1, got " + concurrency);
}
return new PayloadVisitorOptions<>(this);
}
}
}
Loading
Loading