Skip to content

Commit 7025a6a

Browse files
Stop sending deployment info for session per-worker task queue (#2088)
Stop sending deployment info for session per-worker task queue
1 parent 7c0ebcf commit 7025a6a

File tree

2 files changed

+67
-2
lines changed

2 files changed

+67
-2
lines changed

internal/internal_worker.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,15 +448,26 @@ func newSessionWorker(client *WorkflowClient, params workerExecutionParameters,
448448
creationTaskqueue := getCreationTaskqueue(params.TaskQueue)
449449
params.BackgroundContext = context.WithValue(params.BackgroundContext, sessionEnvironmentContextKey, sessionEnvironment)
450450
params.TaskQueue = sessionEnvironment.GetResourceSpecificTaskqueue()
451+
// For the resource specific task queue, we don't need to include deployment options
452+
// Save them to restore later
453+
deployments := params.DeploymentOptions
454+
useBuildIDForVersioning := params.UseBuildIDForVersioning
455+
// Disable versioning for activity worker within session, but still send deployment name for debug purpose
456+
params.DeploymentOptions.UseVersioning = false
457+
params.UseBuildIDForVersioning = false
451458
activityWorker := newActivityWorker(client, params,
452-
&workerOverrides{slotSupplier: params.Tuner.GetSessionActivitySlotSupplier()}, env, nil)
459+
&workerOverrides{
460+
slotSupplier: params.Tuner.GetSessionActivitySlotSupplier(),
461+
}, env, nil)
453462

454463
params.ActivityTaskPollerBehavior = NewPollerBehaviorSimpleMaximum(
455464
PollerBehaviorSimpleMaximumOptions{
456465
MaximumNumberOfPollers: 1,
457466
},
458467
)
459468
params.TaskQueue = creationTaskqueue
469+
params.DeploymentOptions = deployments
470+
params.UseBuildIDForVersioning = useBuildIDForVersioning
460471
// Although we have session token bucket to limit session size across creation
461472
// and recreation, we also limit it here for creation only
462473
overrides := &workerOverrides{}
@@ -518,7 +529,6 @@ func newActivityWorker(
518529
} else {
519530
slotSupplier = params.Tuner.GetActivityTaskSlotSupplier()
520531
}
521-
522532
bwo := baseWorkerOptions{
523533
pollerRate: defaultPollerRate,
524534
slotSupplier: slotSupplier,

test/worker_deployment_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
enumspb "go.temporal.io/api/enums/v1"
1717
"go.temporal.io/api/workflowservice/v1"
1818

19+
"go.temporal.io/sdk/activity"
1920
"go.temporal.io/sdk/client"
2021
"go.temporal.io/sdk/worker"
2122
"go.temporal.io/sdk/workflow"
@@ -236,6 +237,60 @@ func (ts *WorkerDeploymentTestSuite) TestBuildIDChangesOverWorkflowLifetime() {
236237
ts.Equal("2.0", lastBuildID)
237238
}
238239

240+
func (ts *WorkerDeploymentTestSuite) TestBuildIDWithSession() {
241+
242+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
243+
defer cancel()
244+
245+
deploymentName := "deploy-test-" + uuid.NewString()
246+
v1 := worker.WorkerDeploymentVersion{
247+
DeploymentName: deploymentName,
248+
BuildID: "1.0",
249+
}
250+
251+
worker := worker.New(ts.client, ts.taskQueueName, worker.Options{
252+
EnableSessionWorker: true,
253+
DeploymentOptions: worker.DeploymentOptions{
254+
UseVersioning: true,
255+
Version: v1,
256+
},
257+
})
258+
259+
worker.RegisterWorkflowWithOptions(ts.workflows.BasicSession, workflow.RegisterOptions{
260+
Name: "SessionBuildIDWorkflow",
261+
VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
262+
})
263+
264+
activities2 := &Activities2{}
265+
result := &Activities{activities2: activities2}
266+
activities2.impl = result
267+
worker.RegisterActivityWithOptions(activities2, activity.RegisterOptions{Name: "Prefix_", DisableAlreadyRegisteredCheck: true})
268+
269+
ts.NoError(worker.Start())
270+
defer worker.Stop()
271+
272+
dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)
273+
274+
ts.waitForWorkerDeployment(ctx, dHandle)
275+
276+
response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
277+
ts.NoError(err)
278+
279+
ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1)
280+
281+
_, err = dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
282+
BuildID: v1.BuildID,
283+
ConflictToken: response1.ConflictToken,
284+
})
285+
ts.NoError(err)
286+
287+
// start workflow1 with 1.0, BasicSession, auto-upgrade
288+
wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf-1"), "SessionBuildIDWorkflow")
289+
ts.NoError(err)
290+
291+
ts.NoError(wfHandle.Get(ctx, nil))
292+
}
293+
239294
func (ts *WorkerDeploymentTestSuite) TestPinnedBehaviorThreeWorkers() {
240295
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
241296
ts.T().Skip("temporal server 1.27+ required")

0 commit comments

Comments
 (0)