Skip to content

Commit 4e7c8f5

Browse files
committed
Add summary to side effect and mutable side effect
1 parent 13ff29d commit 4e7c8f5

10 files changed

+257
-21
lines changed

internal/interceptor.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,11 @@ type WorkflowOutboundInterceptor interface {
297297
// SideEffect intercepts workflow.SideEffect.
298298
SideEffect(ctx Context, f func(ctx Context) interface{}) converter.EncodedValue
299299

300+
// SideEffectWithOptions intercepts workflow.SideEffectWithOptions.
301+
//
302+
// NOTE: Experimental
303+
SideEffectWithOptions(ctx Context, options SideEffectOptions, f func(ctx Context) interface{}) converter.EncodedValue
304+
300305
// MutableSideEffect intercepts workflow.MutableSideEffect.
301306
MutableSideEffect(
302307
ctx Context,
@@ -305,6 +310,17 @@ type WorkflowOutboundInterceptor interface {
305310
equals func(a, b interface{}) bool,
306311
) converter.EncodedValue
307312

313+
// MutableSideEffectWithOptions intercepts workflow.MutableSideEffectWithOptions.
314+
//
315+
// NOTE: Experimental
316+
MutableSideEffectWithOptions(
317+
ctx Context,
318+
id string,
319+
options MutableSideEffectOptions,
320+
f func(ctx Context) interface{},
321+
equals func(a, b interface{}) bool,
322+
) converter.EncodedValue
323+
308324
// GetVersion intercepts workflow.GetVersion.
309325
GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version
310326

internal/interceptor_base.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,15 @@ func (w *WorkflowOutboundInterceptorBase) SideEffect(
363363
return w.Next.SideEffect(ctx, f)
364364
}
365365

366+
// SideEffectWithOptions implements WorkflowOutboundInterceptor.SideEffectWithOptions.
367+
func (w *WorkflowOutboundInterceptorBase) SideEffectWithOptions(
368+
ctx Context,
369+
options SideEffectOptions,
370+
f func(ctx Context) interface{},
371+
) converter.EncodedValue {
372+
return w.Next.SideEffectWithOptions(ctx, options, f)
373+
}
374+
366375
// MutableSideEffect implements WorkflowOutboundInterceptor.MutableSideEffect.
367376
func (w *WorkflowOutboundInterceptorBase) MutableSideEffect(
368377
ctx Context,
@@ -373,6 +382,17 @@ func (w *WorkflowOutboundInterceptorBase) MutableSideEffect(
373382
return w.Next.MutableSideEffect(ctx, id, f, equals)
374383
}
375384

385+
// MutableSideEffectWithOptions implements WorkflowOutboundInterceptor.MutableSideEffectWithOptions.
386+
func (w *WorkflowOutboundInterceptorBase) MutableSideEffectWithOptions(
387+
ctx Context,
388+
id string,
389+
options MutableSideEffectOptions,
390+
f func(ctx Context) interface{},
391+
equals func(a, b interface{}) bool,
392+
) converter.EncodedValue {
393+
return w.Next.MutableSideEffectWithOptions(ctx, id, options, f, equals)
394+
}
395+
376396
// GetVersion implements WorkflowOutboundInterceptor.GetVersion.
377397
func (w *WorkflowOutboundInterceptorBase) GetVersion(
378398
ctx Context,

internal/internal_command_state_machine.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,7 +1322,7 @@ func (h *commandsHelper) handleVersionMarker(eventID int64, changeID string, sea
13221322
}
13231323
}
13241324

1325-
func (h *commandsHelper) recordSideEffectMarker(sideEffectID int64, data *commonpb.Payloads, dc converter.DataConverter) commandStateMachine {
1325+
func (h *commandsHelper) recordSideEffectMarker(sideEffectID int64, data *commonpb.Payloads, dc converter.DataConverter, userMetadata *sdk.UserMetadata) commandStateMachine {
13261326
markerID := fmt.Sprintf("%v_%v", sideEffectMarkerName, sideEffectID)
13271327
sideEffectIDPayload, err := dc.ToPayloads(sideEffectID)
13281328
if err != nil {
@@ -1336,7 +1336,7 @@ func (h *commandsHelper) recordSideEffectMarker(sideEffectID int64, data *common
13361336
sideEffectMarkerDataName: data,
13371337
},
13381338
}
1339-
command := h.newMarkerCommandStateMachine(markerID, attributes, nil)
1339+
command := h.newMarkerCommandStateMachine(markerID, attributes, userMetadata)
13401340
h.addCommand(command)
13411341
return command
13421342
}
@@ -1359,7 +1359,7 @@ func (h *commandsHelper) recordLocalActivityMarker(activityID string, details ma
13591359
return command
13601360
}
13611361

1362-
func (h *commandsHelper) recordMutableSideEffectMarker(mutableSideEffectID string, callCountHint int, data *commonpb.Payloads, dc converter.DataConverter) commandStateMachine {
1362+
func (h *commandsHelper) recordMutableSideEffectMarker(mutableSideEffectID string, callCountHint int, data *commonpb.Payloads, dc converter.DataConverter, userMetadata *sdk.UserMetadata) commandStateMachine {
13631363
// In order to avoid duplicate marker IDs, we must append the counter to the
13641364
// user-provided ID
13651365
mutableSideEffectID = fmt.Sprintf("%v_%v", mutableSideEffectID, h.getNextID())
@@ -1383,7 +1383,7 @@ func (h *commandsHelper) recordMutableSideEffectMarker(mutableSideEffectID strin
13831383
mutableSideEffectCallCounterName: mutableSideEffectCounterPayload,
13841384
},
13851385
}
1386-
command := h.newMarkerCommandStateMachine(markerID, attributes, nil)
1386+
command := h.newMarkerCommandStateMachine(markerID, attributes, userMetadata)
13871387
h.addCommand(command)
13881388
return command
13891389
}

internal/internal_command_state_machine_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ func Test_MarkerStateMachine(t *testing.T) {
553553
h := newCommandsHelper()
554554

555555
// record marker for side effect
556-
d := h.recordSideEffectMarker(1, nil, converter.GetDefaultDataConverter())
556+
d := h.recordSideEffectMarker(1, nil, converter.GetDefaultDataConverter(), nil)
557557
require.Equal(t, commandStateCreated, d.getState())
558558

559559
// send commands

internal/internal_event_handlers.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,7 @@ func getChangeVersion(changeID string, version Version) string {
932932
return fmt.Sprintf("%s-%v", changeID, version)
933933
}
934934

935-
func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler) {
935+
func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler, summary string) {
936936
sideEffectID := wc.getNextSideEffectID()
937937
var result *commonpb.Payloads
938938
if wc.isReplay {
@@ -961,7 +961,11 @@ func (wc *workflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, erro
961961
}
962962
}
963963

964-
wc.commandsHelper.recordSideEffectMarker(sideEffectID, result, wc.dataConverter)
964+
userMetadata, err := buildUserMetadata(summary, "", wc.dataConverter)
965+
if err != nil {
966+
panic(fmt.Sprintf("failed to build user metadata for side effect: %v", err))
967+
}
968+
wc.commandsHelper.recordSideEffectMarker(sideEffectID, result, wc.dataConverter, userMetadata)
965969

966970
callback(result, nil)
967971
wc.logger.Debug("SideEffect Marker added", tagSideEffectID, sideEffectID)
@@ -1033,7 +1037,7 @@ func (wc *workflowEnvironmentImpl) lookupMutableSideEffect(id string) *commonpb.
10331037
return payloads
10341038
}
10351039

1036-
func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool) converter.EncodedValue {
1040+
func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool, summary string) converter.EncodedValue {
10371041
wc.mutableSideEffectCallCounter[id]++
10381042
callCount := wc.mutableSideEffectCallCounter[id]
10391043

@@ -1044,7 +1048,7 @@ func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interfa
10441048
// recorded on the next task. We have to append the current command
10451049
// counter to the user-provided ID to avoid duplicates.
10461050
if wc.mutableSideEffectsRecorded[fmt.Sprintf("%v_%v", id, wc.commandsHelper.getNextID())] {
1047-
return wc.recordMutableSideEffect(id, callCount, result)
1051+
return wc.recordMutableSideEffect(id, callCount, result, summary)
10481052
}
10491053
return encodedResult
10501054
}
@@ -1054,15 +1058,15 @@ func (wc *workflowEnvironmentImpl) MutableSideEffect(id string, f func() interfa
10541058
return encodedResult
10551059
}
10561060

1057-
return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(newValue))
1061+
return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(newValue), summary)
10581062
}
10591063

10601064
if wc.isReplay {
10611065
// This should not happen
10621066
panicIllegalState(fmt.Sprintf("[TMPRL1100] Non deterministic workflow code change detected. MutableSideEffect API call doesn't have a correspondent event in the workflow history. MutableSideEffect ID: %s", id))
10631067
}
10641068

1065-
return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(f()))
1069+
return wc.recordMutableSideEffect(id, callCount, wc.encodeValue(f()), summary)
10661070
}
10671071

10681072
func (wc *workflowEnvironmentImpl) isEqualValue(newValue interface{}, encodedOldValue *commonpb.Payloads, equals func(a, b interface{}) bool) bool {
@@ -1098,12 +1102,16 @@ func (wc *workflowEnvironmentImpl) encodeArg(arg interface{}) (*commonpb.Payload
10981102
return wc.GetDataConverter().ToPayloads(arg)
10991103
}
11001104

1101-
func (wc *workflowEnvironmentImpl) recordMutableSideEffect(id string, callCountHint int, data *commonpb.Payloads) converter.EncodedValue {
1105+
func (wc *workflowEnvironmentImpl) recordMutableSideEffect(id string, callCountHint int, data *commonpb.Payloads, summary string) converter.EncodedValue {
11021106
details, err := encodeArgs(wc.GetDataConverter(), []interface{}{id, data})
11031107
if err != nil {
11041108
panic(err)
11051109
}
1106-
wc.commandsHelper.recordMutableSideEffectMarker(id, callCountHint, details, wc.dataConverter)
1110+
userMetadata, err := buildUserMetadata(summary, "", wc.dataConverter)
1111+
if err != nil {
1112+
panic(fmt.Sprintf("failed to build user metadata for mutable side effect: %v", err))
1113+
}
1114+
wc.commandsHelper.recordMutableSideEffectMarker(id, callCountHint, details, wc.dataConverter, userMetadata)
11071115
if wc.mutableSideEffect[id] == nil {
11081116
wc.mutableSideEffect[id] = make(map[int]*commonpb.Payloads)
11091117
}

internal/internal_worker_base.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type (
7979
AsyncActivityClient
8080
LocalActivityClient
8181
WorkflowTimerClient
82-
SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler)
82+
SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler, summary string)
8383
GetVersion(changeID string, minSupported, maxSupported Version) Version
8484
WorkflowInfo() *WorkflowInfo
8585
TypedSearchAttributes() SearchAttributes
@@ -114,7 +114,7 @@ type (
114114
handler func(string, string, *commonpb.Payloads, *commonpb.Header, UpdateCallbacks),
115115
)
116116
IsReplaying() bool
117-
MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool) converter.EncodedValue
117+
MutableSideEffect(id string, f func() interface{}, equals func(a, b interface{}) bool, summary string) converter.EncodedValue
118118
GetDataConverter() converter.DataConverter
119119
GetFailureConverter() converter.FailureConverter
120120
AddSession(sessionInfo *SessionInfo)

internal/internal_workflow_testsuite.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2740,7 +2740,7 @@ func (env *testWorkflowEnvironmentImpl) makeUniqueNexusOperationToken(
27402740
return fmt.Sprintf("%s_%s_%s", service, operation, token)
27412741
}
27422742

2743-
func (env *testWorkflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler) {
2743+
func (env *testWorkflowEnvironmentImpl) SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler, _ string) {
27442744
callback(f())
27452745
}
27462746

@@ -2872,7 +2872,7 @@ func (env *testWorkflowEnvironmentImpl) UpsertMemo(memoMap map[string]interface{
28722872
return err
28732873
}
28742874

2875-
func (env *testWorkflowEnvironmentImpl) MutableSideEffect(_ string, f func() interface{}, _ func(a, b interface{}) bool) converter.EncodedValue {
2875+
func (env *testWorkflowEnvironmentImpl) MutableSideEffect(_ string, f func() interface{}, _ func(a, b interface{}) bool, _ string) converter.EncodedValue {
28762876
return newEncodedValue(env.encodeValue(f()), env.GetDataConverter())
28772877
}
28782878

internal/workflow.go

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,36 @@ type (
604604
// NOTE: Experimental
605605
TimerOptions TimerOptions
606606
}
607+
608+
// SideEffectOptions are options for executing a side effect.
609+
//
610+
// NOTE: Experimental
611+
//
612+
// Exposed as: [go.temporal.io/sdk/workflow.SideEffectOptions]
613+
SideEffectOptions struct {
614+
// Summary is a single-line summary of this side effect that will appear in UI/CLI.
615+
// This can be in single-line Temporal Markdown format.
616+
//
617+
// Optional: defaults to none/empty.
618+
//
619+
// NOTE: Experimental
620+
Summary string
621+
}
622+
623+
// MutableSideEffectOptions are options for executing a mutable side effect.
624+
//
625+
// NOTE: Experimental
626+
//
627+
// Exposed as: [go.temporal.io/sdk/workflow.MutableSideEffectOptions]
628+
MutableSideEffectOptions struct {
629+
// Summary is a single-line summary of this side effect that will appear in UI/CLI.
630+
// This can be in single-line Temporal Markdown format.
631+
//
632+
// Optional: defaults to none/empty.
633+
//
634+
// NOTE: Experimental
635+
Summary string
636+
}
607637
)
608638

609639
// Await blocks the calling thread until condition() returns true
@@ -2057,12 +2087,32 @@ func (b EncodedValue) HasValue() bool {
20572087
//
20582088
// Exposed as: [go.temporal.io/sdk/workflow.SideEffect]
20592089
func SideEffect(ctx Context, f func(ctx Context) interface{}) converter.EncodedValue {
2090+
return SideEffectWithOptions(ctx, SideEffectOptions{}, f)
2091+
}
2092+
2093+
// SideEffectWithOptions executes the provided function once, records its result into the workflow history.
2094+
// The recorded result on history will be returned without executing the provided function during replay.
2095+
// This guarantees the deterministic requirement for workflow as the exact same result will be returned in replay.
2096+
// Common use case is to run some short non-deterministic code in workflow, like getting random number or new UUID.
2097+
// The only way to fail SideEffect is to panic which causes workflow task failure. The workflow task after timeout is
2098+
// rescheduled and re-executed giving SideEffect another chance to succeed.
2099+
//
2100+
// The options parameter allows specifying additional options like a summary that will be displayed in UI/CLI.
2101+
//
2102+
// NOTE: Experimental
2103+
//
2104+
// Exposed as: [go.temporal.io/sdk/workflow.SideEffectWithOptions]
2105+
func SideEffectWithOptions(ctx Context, options SideEffectOptions, f func(ctx Context) interface{}) converter.EncodedValue {
20602106
assertNotInReadOnlyState(ctx)
20612107
i := getWorkflowOutboundInterceptor(ctx)
2062-
return i.SideEffect(ctx, f)
2108+
return i.SideEffectWithOptions(ctx, options, f)
20632109
}
20642110

20652111
func (wc *workflowEnvironmentInterceptor) SideEffect(ctx Context, f func(ctx Context) interface{}) converter.EncodedValue {
2112+
return wc.SideEffectWithOptions(ctx, SideEffectOptions{}, f)
2113+
}
2114+
2115+
func (wc *workflowEnvironmentInterceptor) SideEffectWithOptions(ctx Context, options SideEffectOptions, f func(ctx Context) interface{}) converter.EncodedValue {
20662116
dc := getDataConverterFromWorkflowContext(ctx)
20672117
future, settable := NewFuture(ctx)
20682118
wrapperFunc := func() (*commonpb.Payloads, error) {
@@ -2075,7 +2125,7 @@ func (wc *workflowEnvironmentInterceptor) SideEffect(ctx Context, f func(ctx Con
20752125
resultCallback := func(result *commonpb.Payloads, err error) {
20762126
settable.Set(EncodedValue{result, dc}, err)
20772127
}
2078-
wc.env.SideEffect(wrapperFunc, resultCallback)
2128+
wc.env.SideEffect(wrapperFunc, resultCallback, options.Summary)
20792129
var encoded EncodedValue
20802130
if err := future.Get(ctx, &encoded); err != nil {
20812131
panic(err)
@@ -2102,19 +2152,38 @@ func (wc *workflowEnvironmentInterceptor) SideEffect(ctx Context, f func(ctx Con
21022152
//
21032153
// Exposed as: [go.temporal.io/sdk/workflow.MutableSideEffect]
21042154
func MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) converter.EncodedValue {
2155+
return MutableSideEffectWithOptions(ctx, id, MutableSideEffectOptions{}, f, equals)
2156+
}
2157+
2158+
// MutableSideEffectWithOptions executes the provided function once, then it looks up the history for the value with the given id.
2159+
// If there is no existing value, then it records the function result as a value with the given id on history;
2160+
// otherwise, it compares whether the existing value from history has changed from the new function result by calling
2161+
// the provided equals function. If they are equal, it returns the value without recording a new one in history;
2162+
// otherwise, it records the new value with the same id on history.
2163+
//
2164+
// The options parameter allows specifying additional options like a summary that will be displayed in UI/CLI.
2165+
//
2166+
// NOTE: Experimental
2167+
//
2168+
// Exposed as: [go.temporal.io/sdk/workflow.MutableSideEffectWithOptions]
2169+
func MutableSideEffectWithOptions(ctx Context, id string, options MutableSideEffectOptions, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) converter.EncodedValue {
21052170
assertNotInReadOnlyState(ctx)
21062171
i := getWorkflowOutboundInterceptor(ctx)
2107-
return i.MutableSideEffect(ctx, id, f, equals)
2172+
return i.MutableSideEffectWithOptions(ctx, id, options, f, equals)
21082173
}
21092174

21102175
func (wc *workflowEnvironmentInterceptor) MutableSideEffect(ctx Context, id string, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) converter.EncodedValue {
2176+
return wc.MutableSideEffectWithOptions(ctx, id, MutableSideEffectOptions{}, f, equals)
2177+
}
2178+
2179+
func (wc *workflowEnvironmentInterceptor) MutableSideEffectWithOptions(ctx Context, id string, options MutableSideEffectOptions, f func(ctx Context) interface{}, equals func(a, b interface{}) bool) converter.EncodedValue {
21112180
wrapperFunc := func() interface{} {
21122181
coroutineState := getState(ctx)
21132182
defer coroutineState.dispatcher.setIsReadOnly(false)
21142183
coroutineState.dispatcher.setIsReadOnly(true)
21152184
return f(ctx)
21162185
}
2117-
return wc.env.MutableSideEffect(id, wrapperFunc, equals)
2186+
return wc.env.MutableSideEffect(id, wrapperFunc, equals, options.Summary)
21182187
}
21192188

21202189
// DefaultVersion is a version returned by GetVersion for code that wasn't versioned before

0 commit comments

Comments
 (0)