Skip to content

Commit fbfc069

Browse files
authored
Enable setting of Temporal headers through cli (#883)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed Support setting of Temporal headers in workflows ## Why? The cli doesn't support the setting of workflow headers. With this change we would be able to pass the headers to Workflows using cli. ## Checklist <!--- add/delete as needed ---> 1. Closes - #876 2. How was this tested: - Brought up the dev server using : `temporal server start-dev` - Build the cli locally and passed the `--headers` flag to start a workflow. Checked in Temporal UI, the Header section shows the headers set in Workflow. 3. Any docs updates needed? https://docs.temporal.io/cli/workflow
1 parent bdf6b67 commit fbfc069

File tree

6 files changed

+140
-3
lines changed

6 files changed

+140
-3
lines changed

internal/temporalcli/client.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"go.temporal.io/sdk/contrib/envconfig"
1414
"go.temporal.io/sdk/converter"
1515
"go.temporal.io/sdk/log"
16+
"go.temporal.io/sdk/workflow"
1617
"google.golang.org/grpc"
1718
"google.golang.org/grpc/metadata"
1819
)
@@ -229,6 +230,8 @@ func (c *ClientOptions) dialClient(cctx *CommandContext) (client.Client, error)
229230
return client.DialContext(ctxWithTimeout, clientOptions)
230231
}
231232

233+
clientOptions.ContextPropagators = append(clientOptions.ContextPropagators, headerPropagator{})
234+
232235
return client.DialContext(cctx, clientOptions)
233236
}
234237

@@ -314,3 +317,43 @@ func (rawValuePayloadConverter) Encoding() string {
314317
// Should never be used
315318
return "raw-value-encoding"
316319
}
320+
321+
type headerPropagator struct{}
322+
323+
type cliHeaderContextKey struct{}
324+
325+
func (headerPropagator) Inject(ctx context.Context, writer workflow.HeaderWriter) error {
326+
if headers, ok := ctx.Value(cliHeaderContextKey{}).(map[string]any); ok {
327+
for k, v := range headers {
328+
p, err := converter.GetDefaultDataConverter().ToPayload(v)
329+
if err != nil {
330+
return err
331+
}
332+
writer.Set(k, p)
333+
}
334+
}
335+
return nil
336+
}
337+
338+
func (headerPropagator) InjectFromWorkflow(ctx workflow.Context, writer workflow.HeaderWriter) error {
339+
return nil
340+
}
341+
342+
func (headerPropagator) Extract(ctx context.Context, _ workflow.HeaderReader) (context.Context, error) {
343+
return ctx, nil
344+
}
345+
346+
func (headerPropagator) ExtractToWorkflow(ctx workflow.Context, _ workflow.HeaderReader) (workflow.Context, error) {
347+
return ctx, nil
348+
}
349+
350+
func contextWithHeaders(ctx context.Context, headers []string) (context.Context, error) {
351+
if len(headers) == 0 {
352+
return ctx, nil
353+
}
354+
out, err := stringKeysJSONValues(headers, false)
355+
if err != nil {
356+
return ctx, err
357+
}
358+
return context.WithValue(ctx, cliHeaderContextKey{}, out), nil
359+
}

internal/temporalcli/commands.gen.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ type SingleWorkflowOrBatchOptions struct {
184184
Reason string
185185
Yes bool
186186
Rps float32
187+
Headers []string
187188
}
188189

189190
func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
@@ -193,6 +194,7 @@ func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag
193194
f.StringVar(&v.Reason, "reason", "", "Reason for batch operation. Only use with --query. Defaults to user name.")
194195
f.BoolVarP(&v.Yes, "yes", "y", false, "Don't prompt to confirm signaling. Only allowed when --query is present.")
195196
f.Float32Var(&v.Rps, "rps", 0, "Limit batch's requests per second. Only allowed if query is present.")
197+
f.StringArrayVar(&v.Headers, "headers", nil, "Temporal workflow headers in 'KEY=VALUE' format. Keys must be identifiers, and values must be JSON values. May be passed multiple times to set multiple Temporal headers. Note: These are workflow headers, not gRPC headers.")
196198
}
197199

198200
type SharedWorkflowStartOptions struct {
@@ -203,6 +205,7 @@ type SharedWorkflowStartOptions struct {
203205
ExecutionTimeout Duration
204206
TaskTimeout Duration
205207
SearchAttribute []string
208+
Headers []string
206209
Memo []string
207210
StaticSummary string
208211
StaticDetails string
@@ -224,6 +227,7 @@ func (v *SharedWorkflowStartOptions) buildFlags(cctx *CommandContext, f *pflag.F
224227
v.TaskTimeout = Duration(10000 * time.Millisecond)
225228
f.Var(&v.TaskTimeout, "task-timeout", "Fail a Workflow Task if it lasts longer than `DURATION`. This is the Start-to-close timeout for a Workflow Task.")
226229
f.StringArrayVar(&v.SearchAttribute, "search-attribute", nil, "Search Attribute in `KEY=VALUE` format. Keys must be identifiers, and values must be JSON values. For example: 'YourKey={\"your\": \"value\"}'. Can be passed multiple times.")
230+
f.StringArrayVar(&v.Headers, "headers", nil, "Temporal workflow headers in 'KEY=VALUE' format. Keys must be identifiers, and values must be JSON values. May be passed multiple times to set multiple Temporal headers. Note: These are workflow headers, not gRPC headers.")
227231
f.StringArrayVar(&v.Memo, "memo", nil, "Memo using 'KEY=\"VALUE\"' pairs. Use JSON values.")
228232
f.StringVar(&v.StaticSummary, "static-summary", "", "Static Workflow summary for human consumption in UIs. Uses Temporal Markdown formatting, should be a single line. EXPERIMENTAL.")
229233
f.StringVar(&v.StaticDetails, "static-details", "", "Static Workflow details for human consumption in UIs. Uses Temporal Markdown formatting, may be multiple lines. EXPERIMENTAL.")
@@ -272,6 +276,7 @@ type UpdateStartingOptions struct {
272276
WorkflowId string
273277
UpdateId string
274278
RunId string
279+
Headers []string
275280
}
276281

277282
func (v *UpdateStartingOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
@@ -282,6 +287,7 @@ func (v *UpdateStartingOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSe
282287
_ = cobra.MarkFlagRequired(f, "workflow-id")
283288
f.StringVar(&v.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID.")
284289
f.StringVarP(&v.RunId, "run-id", "r", "", "Run ID. If unset, looks for an Update against the currently-running Workflow Execution.")
290+
f.StringArrayVar(&v.Headers, "headers", nil, "Temporal workflow headers in 'KEY=VALUE' format. Keys must be identifiers, and values must be JSON values. May be passed multiple times to set multiple Temporal headers. Note: These are workflow headers, not gRPC headers.")
285291
}
286292

287293
type UpdateTargetingOptions struct {
@@ -325,11 +331,13 @@ func (v *NexusEndpointConfigOptions) buildFlags(cctx *CommandContext, f *pflag.F
325331

326332
type QueryModifiersOptions struct {
327333
RejectCondition StringEnum
334+
Headers []string
328335
}
329336

330337
func (v *QueryModifiersOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
331338
v.RejectCondition = NewStringEnum([]string{"not_open", "not_completed_cleanly"}, "")
332339
f.Var(&v.RejectCondition, "reject-condition", "Optional flag for rejecting Queries based on Workflow state. Accepted values: not_open, not_completed_cleanly.")
340+
f.StringArrayVar(&v.Headers, "headers", nil, "Temporal workflow headers in 'KEY=VALUE' format. Keys must be identifiers, and values must be JSON values. May be passed multiple times to set multiple Temporal headers. Note: These are workflow headers, not gRPC headers.")
333341
}
334342

335343
type WorkflowUpdateOptionsOptions struct {

internal/temporalcli/commands.workflow.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,12 @@ func (c *TemporalWorkflowUpdateOptionsCommand) run(cctx *CommandContext, args []
190190

191191
func (c *TemporalWorkflowMetadataCommand) run(cctx *CommandContext, _ []string) error {
192192
return queryHelper(cctx, c.Parent, PayloadInputOptions{},
193-
metadataQueryName, c.RejectCondition, c.WorkflowReferenceOptions)
193+
metadataQueryName, nil, c.RejectCondition, c.WorkflowReferenceOptions)
194194
}
195195

196196
func (c *TemporalWorkflowQueryCommand) run(cctx *CommandContext, args []string) error {
197197
return queryHelper(cctx, c.Parent, c.PayloadInputOptions,
198-
c.Name, c.RejectCondition, c.WorkflowReferenceOptions)
198+
c.Name, c.Headers, c.RejectCondition, c.WorkflowReferenceOptions)
199199
}
200200

201201
func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string) error {
@@ -210,6 +210,11 @@ func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string)
210210
return err
211211
}
212212

213+
cctx.Context, err = contextWithHeaders(cctx.Context, c.Headers)
214+
if err != nil {
215+
return err
216+
}
217+
213218
exec, batchReq, err := c.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{})
214219

215220
// Run single or batch
@@ -246,7 +251,7 @@ func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string)
246251

247252
func (c *TemporalWorkflowStackCommand) run(cctx *CommandContext, args []string) error {
248253
return queryHelper(cctx, c.Parent, PayloadInputOptions{},
249-
"__stack_trace", c.RejectCondition, c.WorkflowReferenceOptions)
254+
"__stack_trace", nil, c.RejectCondition, c.WorkflowReferenceOptions)
250255
}
251256

252257
func (c *TemporalWorkflowTerminateCommand) run(cctx *CommandContext, _ []string) error {
@@ -449,6 +454,11 @@ func workflowUpdateHelper(cctx *CommandContext,
449454
WaitForStage: waitForStage,
450455
}
451456

457+
cctx.Context, err = contextWithHeaders(cctx.Context, updateStartOpts.Headers)
458+
if err != nil {
459+
return err
460+
}
461+
452462
updateHandle, err := cl.UpdateWorkflow(cctx, request)
453463
if err != nil {
454464
return fmt.Errorf("unable to update workflow: %w", err)
@@ -580,6 +590,7 @@ func queryHelper(cctx *CommandContext,
580590
parent *TemporalWorkflowCommand,
581591
inputOpts PayloadInputOptions,
582592
queryType string,
593+
headers []string,
583594
rejectCondition StringEnum,
584595
execution WorkflowReferenceOptions,
585596
) error {
@@ -605,6 +616,11 @@ func queryHelper(cctx *CommandContext,
605616
return fmt.Errorf("invalid query reject condition: %v, valid values are: 'not_open', 'not_completed_cleanly'", rejectCondition)
606617
}
607618

619+
cctx.Context, err = contextWithHeaders(cctx.Context, headers)
620+
if err != nil {
621+
return err
622+
}
623+
608624
result, err := cl.WorkflowService().QueryWorkflow(cctx, &workflowservice.QueryWorkflowRequest{
609625
Namespace: parent.Namespace,
610626
Execution: &common.WorkflowExecution{WorkflowId: execution.WorkflowId, RunId: execution.RunId},

internal/temporalcli/commands.workflow_exec.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s
158158
searchAttr = &common.SearchAttributes{IndexedFields: fields}
159159
}
160160

161+
cctx.Context, err = contextWithHeaders(cctx.Context, c.SharedWorkflowStartOptions.Headers)
162+
if err != nil {
163+
return err
164+
}
165+
161166
// We have to use the raw signal service call here because the Go SDK's
162167
// signal-with-start call doesn't accept multiple signal arguments.
163168
resp, err := cl.WorkflowService().SignalWithStartWorkflowExecution(
@@ -363,6 +368,10 @@ func executeUpdateWithStartWorkflow(
363368
if err != nil {
364369
return nil, err
365370
}
371+
cctx.Context, err = contextWithHeaders(cctx.Context, sharedWfOpts.Headers)
372+
if err != nil {
373+
return nil, err
374+
}
366375

367376
startOp := cl.NewWithStartWorkflowOperation(
368377
clStartWfOpts,
@@ -521,6 +530,12 @@ func (c *TemporalWorkflowCommand) startWorkflow(
521530
if err != nil {
522531
return nil, err
523532
}
533+
534+
cctx.Context, err = contextWithHeaders(cctx.Context, sharedWorkflowOpts.Headers)
535+
if err != nil {
536+
return nil, err
537+
}
538+
524539
run, err := cl.ExecuteWorkflow(cctx, startOpts, sharedWorkflowOpts.Type, input...)
525540
if err != nil {
526541
return nil, fmt.Errorf("failed starting workflow: %w", err)

internal/temporalcli/commands.workflow_exec_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,33 @@ func (s *SharedServerSuite) TestWorkflow_Start_StartDelay() {
104104
)
105105
}
106106

107+
func (s *SharedServerSuite) TestWorkflow_Start_With_headers() {
108+
res := s.Execute(
109+
"workflow", "start",
110+
"--address", s.Address(),
111+
"--headers", "id=123",
112+
"--task-queue", s.Worker().Options.TaskQueue,
113+
"--type", "DevWorkflow",
114+
"--workflow-id", "id123",
115+
"-i", `["val1", "val2"]`,
116+
)
117+
s.NoError(res.Err)
118+
eventIter := s.Client.GetWorkflowHistory(s.Context, "id123", "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
119+
for eventIter.HasNext() {
120+
event, err := eventIter.Next()
121+
s.NoError(err)
122+
if event.EventType == enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED {
123+
headers := event.GetWorkflowExecutionStartedEventAttributes().GetHeader()
124+
payload := headers.Fields["id"]
125+
s.NotNil(payload)
126+
var val int
127+
err := converter.GetDefaultDataConverter().FromPayload(payload, &val)
128+
s.NoError(err)
129+
s.Equal(123, val)
130+
}
131+
}
132+
}
133+
107134
func (s *SharedServerSuite) TestWorkflow_Execute_SimpleSuccess() {
108135
// Text
109136
s.Worker().OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) {

internal/temporalcli/commands.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4720,6 +4720,13 @@ option-sets:
47204720
description: |
47214721
Limit batch's requests per second.
47224722
Only allowed if query is present.
4723+
- name: headers
4724+
type: string[]
4725+
description: |
4726+
Temporal workflow headers in 'KEY=VALUE' format.
4727+
Keys must be identifiers, and values must be JSON values.
4728+
May be passed multiple times to set multiple Temporal headers.
4729+
Note: These are workflow headers, not gRPC headers.
47234730
47244731
- name: shared-workflow-start
47254732
options:
@@ -4762,6 +4769,13 @@ option-sets:
47624769
Keys must be identifiers, and values must be JSON values.
47634770
For example: 'YourKey={"your": "value"}'.
47644771
Can be passed multiple times.
4772+
- name: headers
4773+
type: string[]
4774+
description: |
4775+
Temporal workflow headers in 'KEY=VALUE' format.
4776+
Keys must be identifiers, and values must be JSON values.
4777+
May be passed multiple times to set multiple Temporal headers.
4778+
Note: These are workflow headers, not gRPC headers.
47654779
- name: memo
47664780
type: string[]
47674781
description: |
@@ -4889,6 +4903,13 @@ option-sets:
48894903
description: |
48904904
Run ID.
48914905
If unset, looks for an Update against the currently-running Workflow Execution.
4906+
- name: headers
4907+
type: string[]
4908+
description: |
4909+
Temporal workflow headers in 'KEY=VALUE' format.
4910+
Keys must be identifiers, and values must be JSON values.
4911+
May be passed multiple times to set multiple Temporal headers.
4912+
Note: These are workflow headers, not gRPC headers.
48924913
48934914
- name: update-targeting
48944915
options:
@@ -4952,6 +4973,13 @@ option-sets:
49524973
enum-values:
49534974
- not_open
49544975
- not_completed_cleanly
4976+
- name: headers
4977+
type: string[]
4978+
description: |
4979+
Temporal workflow headers in 'KEY=VALUE' format.
4980+
Keys must be identifiers, and values must be JSON values.
4981+
May be passed multiple times to set multiple Temporal headers.
4982+
Note: These are workflow headers, not gRPC headers.
49554983
49564984
- name: workflow-update-options
49574985
options:

0 commit comments

Comments
 (0)