Skip to content

Commit 4483a59

Browse files
authored
Merge branch 'master' into memo-user-encode
2 parents bd9b603 + e6b89f1 commit 4483a59

File tree

15 files changed

+464
-58
lines changed

15 files changed

+464
-58
lines changed

.github/workflows/ci.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ on:
55
branches:
66
- master
77

8+
permissions:
9+
contents: read
10+
actions: read
11+
812
jobs:
913
build-and-test:
1014
strategy:
@@ -23,6 +27,10 @@ jobs:
2327
runsOn: macos-15-intel
2428
- os: macos-arm
2529
runsOn: macos-14
30+
permissions:
31+
contents: read
32+
checks: write
33+
statuses: write
2634
runs-on: ${{ matrix.runsOn || matrix.os }}
2735
steps:
2836
- name: Checkout repository

.github/workflows/nightly-throughput-stress.yml

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,32 @@ on:
77
push:
88
branches:
99
- add-nightly-throughput-stress-workflow
10+
- cleanup-nightly-tps-workflow
1011
workflow_dispatch:
1112
inputs:
1213
duration:
1314
description: 'Test duration (e.g., 6h, 1h)'
1415
required: false
15-
default: '6h'
16+
default: '5h'
1617
type: string
1718
timeout:
1819
description: 'Scenario timeout (should be greater than duration)'
1920
required: false
20-
default: '6h30m'
21+
default: '5h30m'
2122
type: string
2223
job_timeout_minutes:
2324
description: 'GitHub Actions job timeout in minutes'
2425
required: false
25-
default: 420
26+
default: 360
2627
type: number
2728

29+
permissions:
30+
contents: read
31+
2832
env:
2933
# Workflow configuration
30-
TEST_DURATION: ${{ inputs.duration || vars.NIGHTLY_TEST_DURATION || '6h' }}
31-
TEST_TIMEOUT: ${{ inputs.timeout || vars.NIGHTLY_TEST_TIMEOUT || '6h30m' }}
34+
TEST_DURATION: ${{ inputs.duration || vars.NIGHTLY_TEST_DURATION || '5h' }}
35+
TEST_TIMEOUT: ${{ inputs.timeout || vars.NIGHTLY_TEST_TIMEOUT || '5h30m' }}
3236

3337
# Logging and artifacts
3438
WORKER_LOG_DIR: /tmp/throughput-stress-logs
@@ -41,7 +45,10 @@ env:
4145
jobs:
4246
throughput-stress:
4347
runs-on: ubuntu-latest-4-cores
44-
timeout-minutes: ${{ fromJSON(inputs.job_timeout_minutes || (vars.NIGHTLY_JOB_TIMEOUT_MINUTES || '420')) }}
48+
timeout-minutes: ${{ fromJSON(inputs.job_timeout_minutes || (vars.NIGHTLY_JOB_TIMEOUT_MINUTES || '360')) }}
49+
permissions:
50+
contents: read
51+
actions: write
4552

4653
steps:
4754
- name: Print test configuration
@@ -93,6 +100,11 @@ jobs:
93100
working-directory: omes
94101
continue-on-error: true
95102
run: |
103+
# This makes the pipeline return the exit code of the first failing command
104+
# Otherwise the output of the `tee` command will be used
105+
# (which is troublesome when the scenario fails but the `tee` command succeeds)
106+
set -o pipefail
107+
96108
# Use run-scenario-with-worker to build and run in one step
97109
# Pass the SDK directory as --version for local testing
98110
# Note: The hardcoded values below match OMES defaults, except:
@@ -113,8 +125,6 @@ jobs:
113125
--option min-throughput-per-hour=1000 \
114126
2>&1 | tee $WORKER_LOG_DIR/scenario.log
115127
116-
echo "SCENARIO_EXIT_CODE=${PIPESTATUS[0]}" >> $GITHUB_ENV
117-
118128
- name: Upload logs on failure/cancellation
119129
if: failure() || cancelled()
120130
uses: actions/upload-artifact@v4
@@ -136,21 +146,10 @@ jobs:
136146
"type": "section",
137147
"text": {
138148
"type": "mrkdwn",
139-
"text": "*Nightly Throughput Stress Failed* :x:\n\n*Duration:* ${{ env.TEST_DURATION }}\n*Run:* <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|View Logs>\n*Triggered by:* ${{ github.event_name == 'schedule' && 'Scheduled' || github.actor }}"
149+
"text": "*Nightly Throughput Stress Failed* :x:\n\n*Repository:* ${{ github.repository }}\n*Duration:* ${{ env.TEST_DURATION }}\n*Run:* <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|View Logs>\n*Triggered by:* ${{ github.event_name == 'schedule' && 'Scheduled' || github.actor }}"
140150
}
141151
}
142152
]
143153
}
144154
env:
145155
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_SDK_ALERTS_WEBHOOK }}
146-
147-
- name: Fail if scenario failed
148-
if: always()
149-
run: |
150-
if [ "${SCENARIO_EXIT_CODE:-1}" != "0" ]; then
151-
echo "❌ Throughput stress test failed with exit code ${SCENARIO_EXIT_CODE}"
152-
echo "Check the artifacts for detailed logs and state"
153-
exit 1
154-
else
155-
echo "✅ Throughput stress test completed successfully"
156-
fi

.github/workflows/omes.yml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,14 @@ on:
44
branches:
55
- master
66

7+
permissions:
8+
contents: read
9+
710
jobs:
811
omes-image-build:
12+
permissions:
13+
contents: read
14+
packages: write
915
uses: temporalio/omes/.github/workflows/docker-images.yml@main
1016
secrets: inherit
1117
with:
@@ -14,4 +20,4 @@ jobs:
1420
sdk-repo-ref: ${{ github.event.pull_request.head.ref || github.ref }}
1521
# TODO: Remove once we have a good way of cleaning up sha-based pushed images
1622
docker-tag-ext: ci-latest
17-
do-push: true
23+
do-push: true

contrib/opentelemetry/tracing_interceptor.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"go.temporal.io/sdk/interceptor"
1616
"go.temporal.io/sdk/log"
17+
"go.temporal.io/sdk/temporal"
1718
)
1819

1920
// DefaultTextMapPropagator is the default OpenTelemetry TextMapPropagator used
@@ -196,8 +197,17 @@ func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (intercepto
196197
}
197198
}
198199

200+
if opts.ToHeader && opts.FromHeader {
201+
return nil, fmt.Errorf("cannot set both ToHeader and FromHeader for span")
202+
}
203+
204+
spanKind := trace.SpanKindServer
205+
if opts.ToHeader {
206+
spanKind = trace.SpanKindClient
207+
}
208+
199209
// Create span
200-
span := t.options.SpanStarter(ctx, t.options.Tracer, opts.Operation+":"+opts.Name, trace.WithTimestamp(opts.Time))
210+
span := t.options.SpanStarter(ctx, t.options.Tracer, opts.Operation+":"+opts.Name, trace.WithTimestamp(opts.Time), trace.WithSpanKind(spanKind))
201211

202212
// Set tags
203213
if len(opts.Tags) > 0 {
@@ -241,12 +251,19 @@ type tracerSpan struct {
241251
}
242252

243253
func (t *tracerSpan) Finish(opts *interceptor.TracerFinishSpanOptions) {
244-
if opts.Error != nil {
254+
t.RecordError(opts.Error)
255+
256+
if opts.Error != nil && !isBenignApplicationError(opts.Error) {
245257
t.SetStatus(codes.Error, opts.Error.Error())
246258
}
247259
t.End()
248260
}
249261

262+
func isBenignApplicationError(err error) bool {
263+
appError, _ := err.(*temporal.ApplicationError)
264+
return appError != nil && appError.Category() == temporal.ApplicationErrorCategoryBenign
265+
}
266+
250267
type textMapCarrier map[string]string
251268

252269
func (t textMapCarrier) Get(key string) string { return t[key] }

contrib/opentelemetry/tracing_interceptor_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,19 @@ package opentelemetry_test
22

33
import (
44
"testing"
5+
"time"
56

7+
"github.com/stretchr/testify/assert"
68
"github.com/stretchr/testify/require"
9+
"go.opentelemetry.io/otel/codes"
710
sdktrace "go.opentelemetry.io/otel/sdk/trace"
811
"go.opentelemetry.io/otel/sdk/trace/tracetest"
912
"go.opentelemetry.io/otel/trace"
1013

1114
"go.temporal.io/sdk/contrib/opentelemetry"
1215
"go.temporal.io/sdk/interceptor"
1316
"go.temporal.io/sdk/internal/interceptortest"
17+
"go.temporal.io/sdk/temporal"
1418
)
1519

1620
func TestSpanPropagation(t *testing.T) {
@@ -42,3 +46,109 @@ func spanChildren(spans []sdktrace.ReadOnlySpan, parentID trace.SpanID) (ret []*
4246
}
4347
return
4448
}
49+
50+
func TestSpanKind(t *testing.T) {
51+
tests := []struct {
52+
operation string
53+
toHeader bool
54+
fromHeader bool
55+
expectedKind trace.SpanKind
56+
}{
57+
{
58+
operation: "StartWorkflow",
59+
toHeader: true,
60+
fromHeader: false,
61+
expectedKind: trace.SpanKindClient,
62+
},
63+
{
64+
operation: "RunWorkflow",
65+
toHeader: false,
66+
fromHeader: true,
67+
expectedKind: trace.SpanKindServer,
68+
},
69+
}
70+
71+
for _, tt := range tests {
72+
t.Run(tt.operation, func(t *testing.T) {
73+
rec := tracetest.NewSpanRecorder()
74+
tracer, err := opentelemetry.NewTracer(opentelemetry.TracerOptions{
75+
Tracer: sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)).Tracer(""),
76+
})
77+
require.NoError(t, err)
78+
79+
span, err := tracer.StartSpan(&interceptor.TracerStartSpanOptions{
80+
Operation: tt.operation,
81+
Name: "test-span",
82+
ToHeader: tt.toHeader,
83+
FromHeader: tt.fromHeader,
84+
})
85+
require.NoError(t, err)
86+
87+
span.Finish(&interceptor.TracerFinishSpanOptions{})
88+
89+
spans := rec.Ended()
90+
require.Equal(t, len(spans), 1)
91+
92+
foundSpan := spans[0]
93+
assert.Equal(t, tt.expectedKind, foundSpan.SpanKind(),
94+
"Expected span kind %v but got %v for operation %s",
95+
tt.expectedKind, foundSpan.SpanKind(), tt.operation)
96+
})
97+
}
98+
}
99+
100+
func TestBenignErrorSpanStatus(t *testing.T) {
101+
tests := []struct {
102+
name string
103+
err error
104+
expectError bool
105+
expectStatus codes.Code
106+
}{
107+
{
108+
name: "benign application error should not set error status",
109+
err: temporal.NewApplicationErrorWithOptions("benign error", "TestType", temporal.ApplicationErrorOptions{Category: temporal.ApplicationErrorCategoryBenign}),
110+
expectError: false,
111+
expectStatus: codes.Unset,
112+
},
113+
{
114+
name: "regular application error should set error status",
115+
err: temporal.NewApplicationError("regular error", "TestType"),
116+
expectError: true,
117+
expectStatus: codes.Error,
118+
},
119+
}
120+
121+
for _, tt := range tests {
122+
t.Run(tt.name, func(t *testing.T) {
123+
rec := tracetest.NewSpanRecorder()
124+
tracer, err := opentelemetry.NewTracer(opentelemetry.TracerOptions{
125+
Tracer: sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)).Tracer(""),
126+
})
127+
require.NoError(t, err)
128+
129+
span, err := tracer.StartSpan(&interceptor.TracerStartSpanOptions{
130+
Operation: "TestOperation",
131+
Name: "TestSpan",
132+
Time: time.Now(),
133+
})
134+
require.NoError(t, err)
135+
136+
span.Finish(&interceptor.TracerFinishSpanOptions{
137+
Error: tt.err,
138+
})
139+
140+
// Check recorded spans
141+
spans := rec.Ended()
142+
require.Len(t, spans, 1)
143+
144+
recordedSpan := spans[0]
145+
assert.Equal(t, tt.expectStatus, recordedSpan.Status().Code)
146+
147+
if tt.expectError {
148+
assert.NotEmpty(t, recordedSpan.Status().Description)
149+
} else {
150+
assert.Empty(t, recordedSpan.Status().Description)
151+
}
152+
})
153+
}
154+
}

contrib/tools/workflowcheck/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ In addition to those functions/vars, the following Go source constructs are cons
145145
* Iterating over a channel via `range`
146146
* Iterating over a map via `range`
147147

148-
Many constructs that are known to be non-deterministic, such as mutating a global variable, are not able to be reliably
148+
Many constructs that are known to be non-deterministic, such as mutating a global variable, cannot be reliably
149149
distinguished from deterministic use in common cases. This tool intentionally does not flag them.
150150

151151
In some cases, functions that are considered non-deterministic are commonly used in ways that only follow a

internal/internal_worker.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,15 +448,26 @@ func newSessionWorker(client *WorkflowClient, params workerExecutionParameters,
448448
creationTaskqueue := getCreationTaskqueue(params.TaskQueue)
449449
params.BackgroundContext = context.WithValue(params.BackgroundContext, sessionEnvironmentContextKey, sessionEnvironment)
450450
params.TaskQueue = sessionEnvironment.GetResourceSpecificTaskqueue()
451+
// For the resource specific task queue, we don't need to include deployment options
452+
// Save them to restore later
453+
deployments := params.DeploymentOptions
454+
useBuildIDForVersioning := params.UseBuildIDForVersioning
455+
// Disable versioning for activity worker within session, but still send deployment name for debug purpose
456+
params.DeploymentOptions.UseVersioning = false
457+
params.UseBuildIDForVersioning = false
451458
activityWorker := newActivityWorker(client, params,
452-
&workerOverrides{slotSupplier: params.Tuner.GetSessionActivitySlotSupplier()}, env, nil)
459+
&workerOverrides{
460+
slotSupplier: params.Tuner.GetSessionActivitySlotSupplier(),
461+
}, env, nil)
453462

454463
params.ActivityTaskPollerBehavior = NewPollerBehaviorSimpleMaximum(
455464
PollerBehaviorSimpleMaximumOptions{
456465
MaximumNumberOfPollers: 1,
457466
},
458467
)
459468
params.TaskQueue = creationTaskqueue
469+
params.DeploymentOptions = deployments
470+
params.UseBuildIDForVersioning = useBuildIDForVersioning
460471
// Although we have session token bucket to limit session size across creation
461472
// and recreation, we also limit it here for creation only
462473
overrides := &workerOverrides{}
@@ -518,7 +529,6 @@ func newActivityWorker(
518529
} else {
519530
slotSupplier = params.Tuner.GetActivityTaskSlotSupplier()
520531
}
521-
522532
bwo := baseWorkerOptions{
523533
pollerRate: defaultPollerRate,
524534
slotSupplier: slotSupplier,

internal/internal_worker_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -885,7 +885,7 @@ func newScalableTaskPoller(
885885
}
886886
switch p := pollerBehavior.(type) {
887887
case *pollerBehaviorAutoscaling:
888-
tw.pollerCount = p.initialNumberOfPollers
888+
tw.pollerCount = p.maximumNumberOfPollers
889889
tw.pollerSemaphore = newPollerSemaphore(p.initialNumberOfPollers)
890890
tw.pollerAutoscalerReportHandle = newPollScalerReportHandle(pollScalerReportHandleOptions{
891891
initialPollerCount: p.initialNumberOfPollers,

0 commit comments

Comments
 (0)