Skip to content

Commit 13d131c

Browse files
authored
Merge branch 'master' into upgrade-dd-trace-go-to-v2
2 parents 37c6aa1 + 87543ca commit 13d131c

File tree

5 files changed

+74
-4
lines changed

5 files changed

+74
-4
lines changed

contrib/tools/workflowcheck/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ In addition to those functions/vars, the following Go source constructs are cons
145145
* Iterating over a channel via `range`
146146
* Iterating over a map via `range`
147147

148-
Many constructs that are known to be non-deterministic, such as mutating a global variable, are not able to be reliably
148+
Many constructs that are known to be non-deterministic, such as mutating a global variable, cannot be reliably
149149
distinguished from deterministic use in common cases. This tool intentionally does not flag them.
150150

151151
In some cases, functions that are considered non-deterministic are commonly used in ways that only follow a

internal/internal_worker.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,15 +448,26 @@ func newSessionWorker(client *WorkflowClient, params workerExecutionParameters,
448448
creationTaskqueue := getCreationTaskqueue(params.TaskQueue)
449449
params.BackgroundContext = context.WithValue(params.BackgroundContext, sessionEnvironmentContextKey, sessionEnvironment)
450450
params.TaskQueue = sessionEnvironment.GetResourceSpecificTaskqueue()
451+
// For the resource specific task queue, we don't need to include deployment options
452+
// Save them to restore later
453+
deployments := params.DeploymentOptions
454+
useBuildIDForVersioning := params.UseBuildIDForVersioning
455+
// Disable versioning for activity worker within session, but still send deployment name for debug purpose
456+
params.DeploymentOptions.UseVersioning = false
457+
params.UseBuildIDForVersioning = false
451458
activityWorker := newActivityWorker(client, params,
452-
&workerOverrides{slotSupplier: params.Tuner.GetSessionActivitySlotSupplier()}, env, nil)
459+
&workerOverrides{
460+
slotSupplier: params.Tuner.GetSessionActivitySlotSupplier(),
461+
}, env, nil)
453462

454463
params.ActivityTaskPollerBehavior = NewPollerBehaviorSimpleMaximum(
455464
PollerBehaviorSimpleMaximumOptions{
456465
MaximumNumberOfPollers: 1,
457466
},
458467
)
459468
params.TaskQueue = creationTaskqueue
469+
params.DeploymentOptions = deployments
470+
params.UseBuildIDForVersioning = useBuildIDForVersioning
460471
// Although we have session token bucket to limit session size across creation
461472
// and recreation, we also limit it here for creation only
462473
overrides := &workerOverrides{}
@@ -518,7 +529,6 @@ func newActivityWorker(
518529
} else {
519530
slotSupplier = params.Tuner.GetActivityTaskSlotSupplier()
520531
}
521-
522532
bwo := baseWorkerOptions{
523533
pollerRate: defaultPollerRate,
524534
slotSupplier: slotSupplier,

internal/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const (
88
// Server validates if SDKVersion fits its supported range and rejects request if it doesn't.
99
//
1010
// Exposed as: [go.temporal.io/sdk/temporal.SDKVersion]
11-
SDKVersion = "1.37.0"
11+
SDKVersion = "1.38.0"
1212

1313
// SDKName represents the name of the SDK.
1414
SDKName = clientNameHeaderValue

temporalnexus/operation.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,9 @@ type WorkflowHandle[T any] interface {
219219
// Link to the WorkflowExecutionStarted event of the workflow represented by this handle.
220220
link() nexus.Link
221221
token() string // Cached operation token
222+
223+
// typeMarker is a no-op method to associate the generic type T with the interface.
224+
typeMarker(T)
222225
}
223226

224227
type workflowHandle[T any] struct {
@@ -259,6 +262,8 @@ func (h workflowHandle[T]) token() string {
259262
return h.cachedToken
260263
}
261264

265+
func (h workflowHandle[T]) typeMarker(T) {}
266+
262267
// ExecuteWorkflow starts a workflow run for a [WorkflowRunOperationOptions] Handler, linking the execution chain to a
263268
// Nexus operation (subsequent runs started from continue-as-new and retries).
264269
// Automatically propagates the callback and request ID from the nexus options to the workflow.

test/worker_deployment_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
enumspb "go.temporal.io/api/enums/v1"
1717
"go.temporal.io/api/workflowservice/v1"
1818

19+
"go.temporal.io/sdk/activity"
1920
"go.temporal.io/sdk/client"
2021
"go.temporal.io/sdk/worker"
2122
"go.temporal.io/sdk/workflow"
@@ -236,6 +237,60 @@ func (ts *WorkerDeploymentTestSuite) TestBuildIDChangesOverWorkflowLifetime() {
236237
ts.Equal("2.0", lastBuildID)
237238
}
238239

240+
func (ts *WorkerDeploymentTestSuite) TestBuildIDWithSession() {
241+
242+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
243+
defer cancel()
244+
245+
deploymentName := "deploy-test-" + uuid.NewString()
246+
v1 := worker.WorkerDeploymentVersion{
247+
DeploymentName: deploymentName,
248+
BuildID: "1.0",
249+
}
250+
251+
worker := worker.New(ts.client, ts.taskQueueName, worker.Options{
252+
EnableSessionWorker: true,
253+
DeploymentOptions: worker.DeploymentOptions{
254+
UseVersioning: true,
255+
Version: v1,
256+
},
257+
})
258+
259+
worker.RegisterWorkflowWithOptions(ts.workflows.BasicSession, workflow.RegisterOptions{
260+
Name: "SessionBuildIDWorkflow",
261+
VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
262+
})
263+
264+
activities2 := &Activities2{}
265+
result := &Activities{activities2: activities2}
266+
activities2.impl = result
267+
worker.RegisterActivityWithOptions(activities2, activity.RegisterOptions{Name: "Prefix_", DisableAlreadyRegisteredCheck: true})
268+
269+
ts.NoError(worker.Start())
270+
defer worker.Stop()
271+
272+
dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)
273+
274+
ts.waitForWorkerDeployment(ctx, dHandle)
275+
276+
response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
277+
ts.NoError(err)
278+
279+
ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1)
280+
281+
_, err = dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
282+
BuildID: v1.BuildID,
283+
ConflictToken: response1.ConflictToken,
284+
})
285+
ts.NoError(err)
286+
287+
// start workflow1 with 1.0, BasicSession, auto-upgrade
288+
wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf-1"), "SessionBuildIDWorkflow")
289+
ts.NoError(err)
290+
291+
ts.NoError(wfHandle.Get(ctx, nil))
292+
}
293+
239294
func (ts *WorkerDeploymentTestSuite) TestPinnedBehaviorThreeWorkers() {
240295
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
241296
ts.T().Skip("temporal server 1.27+ required")

0 commit comments

Comments
 (0)