diff --git a/contrib/datadog/go.mod b/contrib/datadog/go.mod index ebe4ad9d1..2cc539ec7 100644 --- a/contrib/datadog/go.mod +++ b/contrib/datadog/go.mod @@ -81,7 +81,7 @@ require ( go.opentelemetry.io/otel/metric v1.37.0 // indirect go.opentelemetry.io/otel/sdk v1.37.0 // indirect go.opentelemetry.io/otel/trace v1.37.0 // indirect - go.temporal.io/api v1.62.1 // indirect + go.temporal.io/api v1.62.2 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect diff --git a/contrib/datadog/go.sum b/contrib/datadog/go.sum index bf81a5e48..3597cfa76 100644 --- a/contrib/datadog/go.sum +++ b/contrib/datadog/go.sum @@ -233,8 +233,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1 h1:T go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1/go.mod h1:riqUmAOJFDFuIAzZu/3V6cOrTyfWzpgNJnG5UwrapCk= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1 h1:z/oMlrCv3Kopwh/dtdRagJy+qsRRPA86/Ux3g7+zFXM= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1/go.mod h1:C7EHYSIiaALi9RnNORCVaPCQDuJgJEn/XxkctaTez1E= -go.temporal.io/api v1.62.1 h1:7UHMNOIqfYBVTaW0JIh/wDpw2jORkB6zUKsxGtvjSZU= -go.temporal.io/api v1.62.1/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= +go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= diff --git a/contrib/envconfig/go.mod b/contrib/envconfig/go.mod index 8230e121e..1b2f7887f 100644 --- a/contrib/envconfig/go.mod +++ b/contrib/envconfig/go.mod @@ -22,7 +22,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.1 // indirect + go.temporal.io/api v1.62.2 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.32.0 // indirect diff --git a/contrib/envconfig/go.sum b/contrib/envconfig/go.sum index c106af2b4..efd239bb2 100644 --- a/contrib/envconfig/go.sum +++ b/contrib/envconfig/go.sum @@ -37,8 +37,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.62.1 h1:7UHMNOIqfYBVTaW0JIh/wDpw2jORkB6zUKsxGtvjSZU= -go.temporal.io/api v1.62.1/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= +go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/opentelemetry/go.mod b/contrib/opentelemetry/go.mod index b97256862..a2021e842 100644 --- a/contrib/opentelemetry/go.mod +++ b/contrib/opentelemetry/go.mod @@ -32,7 +32,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/otel/metric v1.27.0 go.opentelemetry.io/otel/sdk/metric v1.27.0 - go.temporal.io/api v1.62.1 // indirect + go.temporal.io/api v1.62.2 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sys v0.32.0 // indirect golang.org/x/text v0.24.0 // indirect diff --git a/contrib/opentelemetry/go.sum b/contrib/opentelemetry/go.sum index f2a40b0ac..5411c514b 100644 --- a/contrib/opentelemetry/go.sum +++ b/contrib/opentelemetry/go.sum @@ -50,8 +50,8 @@ go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2N go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw= go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= -go.temporal.io/api v1.62.1 h1:7UHMNOIqfYBVTaW0JIh/wDpw2jORkB6zUKsxGtvjSZU= -go.temporal.io/api v1.62.1/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= +go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/opentracing/go.mod b/contrib/opentracing/go.mod index eb81bc31e..8dddbabef 100644 --- a/contrib/opentracing/go.mod +++ b/contrib/opentracing/go.mod @@ -22,7 +22,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.62.1 // indirect + go.temporal.io/api v1.62.2 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.32.0 // indirect diff --git a/contrib/opentracing/go.sum b/contrib/opentracing/go.sum index 0f7793daf..348077ad4 100644 --- a/contrib/opentracing/go.sum +++ b/contrib/opentracing/go.sum @@ -40,8 +40,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.62.1 h1:7UHMNOIqfYBVTaW0JIh/wDpw2jORkB6zUKsxGtvjSZU= -go.temporal.io/api v1.62.1/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= +go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/sysinfo/go.mod b/contrib/sysinfo/go.mod index f773fddc5..eb6a90c62 100644 --- a/contrib/sysinfo/go.mod +++ b/contrib/sysinfo/go.mod @@ -35,7 +35,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.temporal.io/api v1.62.1 // indirect + go.temporal.io/api v1.62.2 // indirect golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect diff --git a/contrib/sysinfo/go.sum b/contrib/sysinfo/go.sum index 1e3c9a3d0..5e24a10fd 100644 --- a/contrib/sysinfo/go.sum +++ b/contrib/sysinfo/go.sum @@ -71,8 +71,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -go.temporal.io/api v1.62.1 h1:7UHMNOIqfYBVTaW0JIh/wDpw2jORkB6zUKsxGtvjSZU= -go.temporal.io/api v1.62.1/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= +go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/tally/go.mod b/contrib/tally/go.mod index 4be9a9bf1..e9bee9ac5 100644 --- a/contrib/tally/go.mod +++ b/contrib/tally/go.mod @@ -23,7 +23,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/twmb/murmur3 v1.1.5 // indirect - go.temporal.io/api v1.62.1 // indirect + go.temporal.io/api v1.62.2 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect diff --git a/contrib/tally/go.sum b/contrib/tally/go.sum index 2f9d8bdb7..801023637 100644 --- a/contrib/tally/go.sum +++ b/contrib/tally/go.sum @@ -119,8 +119,8 @@ github.com/uber-go/tally/v4 v4.1.1/go.mod h1:aXeSTDMl4tNosyf6rdU8jlgScHyjEGGtfJ/ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.62.1 h1:7UHMNOIqfYBVTaW0JIh/wDpw2jORkB6zUKsxGtvjSZU= -go.temporal.io/api v1.62.1/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= +go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/go.mod b/go.mod index fa7a74559..54af3aaff 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/nexus-rpc/sdk-go v0.6.0 github.com/robfig/cron v1.2.0 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.1 + go.temporal.io/api v1.62.2 golang.org/x/sync v0.13.0 golang.org/x/sys v0.32.0 golang.org/x/time v0.3.0 diff --git a/go.sum b/go.sum index d5ce6c317..2863c510b 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.62.1 h1:7UHMNOIqfYBVTaW0JIh/wDpw2jORkB6zUKsxGtvjSZU= -go.temporal.io/api v1.62.1/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= +go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/cmd/build/go.mod b/internal/cmd/build/go.mod index ff31349d1..835d262d9 100644 --- a/internal/cmd/build/go.mod +++ b/internal/cmd/build/go.mod @@ -24,7 +24,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/testify v1.10.0 // indirect - go.temporal.io/api v1.62.1 // indirect + go.temporal.io/api v1.62.2 // indirect golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect golang.org/x/mod v0.23.0 // indirect golang.org/x/net v0.39.0 // indirect diff --git a/internal/cmd/build/go.sum b/internal/cmd/build/go.sum index 15d1bcdb6..3e16546ad 100644 --- a/internal/cmd/build/go.sum +++ b/internal/cmd/build/go.sum @@ -55,8 +55,8 @@ go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiy go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= -go.temporal.io/api v1.62.1 h1:7UHMNOIqfYBVTaW0JIh/wDpw2jORkB6zUKsxGtvjSZU= -go.temporal.io/api v1.62.1/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= +go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/error.go b/internal/error.go index 473410950..bfabb811d 100644 --- a/internal/error.go +++ b/internal/error.go @@ -210,6 +210,11 @@ type ( // Deprecated: Use Worker Deployment Versioning instead. See https://docs.temporal.io/worker-versioning VersioningIntent VersioningIntent + // InitialVersioningBehavior specifies the versioning behavior that the first task of the new run should use. + // For example, choose to AutoUpgrade on continue-as-new instead of inheriting the pinned version of the previous run. + // NOTE: Upgrade-on-Continue-as-New is currently experimental. + InitialVersioningBehavior ContinueAsNewVersioningBehavior + // This is by default nil but may be overridden using NewContinueAsNewErrorWithOptions. // It specifies the retry policy which gets carried over to the next run. // If not set, the current workflow's retry policy will be carried over automatically. @@ -229,6 +234,11 @@ type ( // RetryPolicy specifies the retry policy to be used for the next run. // If nil, the current workflow's retry policy will be used. RetryPolicy *RetryPolicy + + // InitialVersioningBehavior specifies the versioning behavior that the first task of the new run should use. + // For example, choose to AutoUpgrade on continue-as-new instead of inheriting the pinned version of the previous run. + // NOTE: Upgrade-on-Continue-as-New is currently experimental. + InitialVersioningBehavior ContinueAsNewVersioningBehavior } // UnknownExternalWorkflowExecutionError can be returned when external workflow doesn't exist @@ -594,6 +604,7 @@ func NewContinueAsNewErrorWithOptions(ctx Context, options ContinueAsNewErrorOpt if options.RetryPolicy != nil { continueAsNewErr.RetryPolicy = options.RetryPolicy } + continueAsNewErr.InitialVersioningBehavior = options.InitialVersioningBehavior } return err @@ -621,15 +632,16 @@ func (wc *workflowEnvironmentInterceptor) NewContinueAsNewError( } return &ContinueAsNewError{ - WorkflowType: workflowType, - Input: input, - Header: header, - TaskQueueName: options.TaskQueueName, - WorkflowExecutionTimeout: options.WorkflowExecutionTimeout, - WorkflowRunTimeout: options.WorkflowRunTimeout, - WorkflowTaskTimeout: options.WorkflowTaskTimeout, - VersioningIntent: options.VersioningIntent, - RetryPolicy: nil, // The retry policy can't be propagated like other options due to #676. + WorkflowType: workflowType, + Input: input, + Header: header, + TaskQueueName: options.TaskQueueName, + WorkflowExecutionTimeout: options.WorkflowExecutionTimeout, + WorkflowRunTimeout: options.WorkflowRunTimeout, + WorkflowTaskTimeout: options.WorkflowTaskTimeout, + VersioningIntent: options.VersioningIntent, + RetryPolicy: nil, // The retry policy can't be propagated like other options due to #676. + InitialVersioningBehavior: options.InitialVersioningBehavior, } } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 10be44baa..641476701 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1205,6 +1205,10 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( // Update workflow info fields weh.workflowInfo.currentHistoryLength = int(event.EventId) weh.workflowInfo.continueAsNewSuggested = event.GetWorkflowTaskStartedEventAttributes().GetSuggestContinueAsNew() + weh.workflowInfo.continueAsNewSuggestedReasons = convertContinueAsNewSuggestedReasonsFromProto( + event.GetWorkflowTaskStartedEventAttributes().GetSuggestContinueAsNewReasons(), + ) + weh.workflowInfo.targetWorkerDeploymentVersionChanged = event.GetWorkflowTaskStartedEventAttributes().GetTargetWorkerDeploymentVersionChanged() weh.workflowInfo.currentHistorySize = int(event.GetWorkflowTaskStartedEventAttributes().GetHistorySizeBytes()) // Reset the counter on command helper used for generating ID for commands weh.commandsHelper.setCurrentWorkflowTaskStartedEventID(event.GetEventId()) @@ -2160,3 +2164,13 @@ func (weh *workflowExecutionEventHandlerImpl) protocolConstructorForMessage( } return nil, fmt.Errorf("unsupported protocol: %v", protoName) } + +func convertContinueAsNewSuggestedReasonsFromProto( + reasons []enumspb.SuggestContinueAsNewReason, +) []ContinueAsNewSuggestedReason { + converted := make([]ContinueAsNewSuggestedReason, 0, len(reasons)) + for _, reason := range reasons { + converted = append(converted, ContinueAsNewSuggestedReason(reason)) + } + return converted +} diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index c23bfda86..5979ad95a 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1884,16 +1884,17 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow( useCompat := determineInheritBuildIdFlagForCommand( contErr.VersioningIntent, workflowContext.workflowInfo.TaskQueueName, contErr.TaskQueueName) closeCommand.Attributes = &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{ - WorkflowType: &commonpb.WorkflowType{Name: contErr.WorkflowType.Name}, - Input: contErr.Input, - TaskQueue: &taskqueuepb.TaskQueue{Name: contErr.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, - WorkflowRunTimeout: durationpb.New(contErr.WorkflowRunTimeout), - WorkflowTaskTimeout: durationpb.New(contErr.WorkflowTaskTimeout), - Header: contErr.Header, - Memo: workflowContext.workflowInfo.Memo, - SearchAttributes: workflowContext.workflowInfo.SearchAttributes, - RetryPolicy: convertToPBRetryPolicy(retryPolicy), - InheritBuildId: useCompat, + WorkflowType: &commonpb.WorkflowType{Name: contErr.WorkflowType.Name}, + Input: contErr.Input, + TaskQueue: &taskqueuepb.TaskQueue{Name: contErr.TaskQueueName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + WorkflowRunTimeout: durationpb.New(contErr.WorkflowRunTimeout), + WorkflowTaskTimeout: durationpb.New(contErr.WorkflowTaskTimeout), + Header: contErr.Header, + Memo: workflowContext.workflowInfo.Memo, + SearchAttributes: workflowContext.workflowInfo.SearchAttributes, + RetryPolicy: convertToPBRetryPolicy(retryPolicy), + InheritBuildId: useCompat, + InitialVersioningBehavior: continueAsNewVersioningBehaviorToProto(contErr.InitialVersioningBehavior), }} } else if workflowContext.err != nil { // Workflow failures diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 178a3791c..1571f4fda 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -210,8 +210,9 @@ type ( queryHandlers map[string]*queryHandler updateHandlers map[string]*updateHandler // runningUpdatesHandles is a map of update handlers that are currently running. - runningUpdatesHandles map[string]UpdateInfo - VersioningIntent VersioningIntent + runningUpdatesHandles map[string]UpdateInfo + VersioningIntent VersioningIntent + InitialVersioningBehavior ContinueAsNewVersioningBehavior // currentDetails is the user-set string returned on metadata query as // WorkflowMetadata.current_details currentDetails string diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 17c80b5bc..221d89ad3 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -430,6 +430,14 @@ func (env *testWorkflowEnvironmentImpl) setContinueAsNewSuggested(suggest bool) env.workflowInfo.continueAsNewSuggested = suggest } +func (env *testWorkflowEnvironmentImpl) setContinueAsNewSuggestedReasons(reasons []ContinueAsNewSuggestedReason) { + env.workflowInfo.continueAsNewSuggestedReasons = reasons +} + +func (env *testWorkflowEnvironmentImpl) setTargetWorkerDeploymentVersionChanged(changed bool) { + env.workflowInfo.targetWorkerDeploymentVersionChanged = changed +} + func (env *testWorkflowEnvironmentImpl) setContinuedExecutionRunID(rid string) { env.workflowInfo.ContinuedExecutionRunID = rid } diff --git a/internal/workflow.go b/internal/workflow.go index a10b8274a..a7784b3b2 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -67,6 +67,63 @@ const ( VersioningBehaviorAutoUpgrade ) +// ContinueAsNewVersioningBehavior specifies how the new workflow run after ContinueAsNew should change its Build ID. +// +// NOTE: Upgrade-on-Continue-as-New is currently experimental. +// +// Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehavior] +type ContinueAsNewVersioningBehavior int + +const ( + // ContinueAsNewVersioningBehaviorUnspecified - Workflow versioning policy unknown. + // If the source workflow was AutoUpgrade, the new workflow will start as AutoUpgrade. + // If the source workflow was Pinned, the new workflow will start Pinned to the same Build ID. + // If the source workflow had a Pinned Versioning Override, the new workflow will inherit that Versioning Override. + // + // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorUnspecified] + ContinueAsNewVersioningBehaviorUnspecified = iota + + // ContinueAsNewVersioningBehaviorAutoUpgrade - Start the new workflow with AutoUpgrade versioning behavior. + // Like all AutoUpgrade workflows, use the Target Version of the workflow's task queue at start-time. After the + // first workflow task completes, use whatever Versioning Behavior the workflow is annotated with in the workflow + // code. + // + // Note that if the previous workflow had a Pinned override, that override will be inherited by the new workflow + // run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new command. + // + // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorAutoUpgrade] + ContinueAsNewVersioningBehaviorAutoUpgrade = 1 +) + +// ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time. +// +// NOTE: ContinueAsNewSuggestedReasons are currently experimental. +// +// Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewSuggestedReason] +type ContinueAsNewSuggestedReason int + +const ( + // ContinueAsNewSuggestedReasonUnspecified - The reason is unknown. + // + // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewSuggestedReasonUnspecified] + ContinueAsNewSuggestedReasonUnspecified = iota + + // ContinueAsNewSuggestedReasonHistorySizeTooLarge - Workflow History size is getting too large. + // + // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewSuggestedReasonHistorySizeTooLarge] + ContinueAsNewSuggestedReasonHistorySizeTooLarge = 1 + + // ContinueAsNewSuggestedReasonTooManyHistoryEvents - Workflow History is getting too long. + // + // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewSuggestedReasonTooManyHistoryEvents] + ContinueAsNewSuggestedReasonTooManyHistoryEvents = 2 + + // ContinueAsNewSuggestedReasonTooManyUpdates - Workflow's count of completed plus in-flight updates is too large. + // + // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewSuggestedReasonTooManyUpdates] + ContinueAsNewSuggestedReasonTooManyUpdates = 3 +) + // NexusOperationCancellationType specifies what action should be taken for a Nexus operation when the // caller is cancelled. // @@ -1389,9 +1446,13 @@ type WorkflowInfo struct { // this worker currentTaskBuildID string - continueAsNewSuggested bool - currentHistorySize int - currentHistoryLength int + continueAsNewSuggested bool + continueAsNewSuggestedReasons []ContinueAsNewSuggestedReason + + targetWorkerDeploymentVersionChanged bool + + currentHistorySize int + currentHistoryLength int // currentRunID is the current run ID of the workflow task, deterministic over reset currentRunID string } @@ -1443,6 +1504,23 @@ func (wInfo *WorkflowInfo) GetContinueAsNewSuggested() bool { return wInfo.continueAsNewSuggested } +// GetContinueAsNewSuggestedReasons returns a list of reasons why continue as new is suggested, +// if the server is configured to suggest continue as new and it is suggested. +// This value may change throughout the life of the workflow. +// +// Note: ContinueAsNewSuggestedReasons are currently experimental. +func (wInfo *WorkflowInfo) GetContinueAsNewSuggestedReasons() []ContinueAsNewSuggestedReason { + return wInfo.continueAsNewSuggestedReasons +} + +// GetTargetWorkerDeploymentVersionChanged returns whether the target worker deployment +// version has changed. +// +// Note: Upgrade-on-Continue-as-New is currently experimental. +func (wInfo *WorkflowInfo) GetTargetWorkerDeploymentVersionChanged() bool { + return wInfo.targetWorkerDeploymentVersionChanged +} + // GetWorkflowInfo extracts info of a current workflow from a context. // // Exposed as: [go.temporal.io/sdk/workflow.GetInfo] @@ -1455,6 +1533,7 @@ func (wc *workflowEnvironmentInterceptor) GetInfo(ctx Context) *WorkflowInfo { return wc.env.WorkflowInfo() } +// // Exposed as: [go.temporal.io/sdk/workflow.GetTypedSearchAttributes] func GetTypedSearchAttributes(ctx Context) SearchAttributes { i := getWorkflowOutboundInterceptor(ctx) @@ -3010,3 +3089,14 @@ func versioningBehaviorToProto(t VersioningBehavior) enumspb.VersioningBehavior panic("unknown versioning behavior type") } } + +func continueAsNewVersioningBehaviorToProto(t ContinueAsNewVersioningBehavior) enumspb.ContinueAsNewVersioningBehavior { + switch t { + case ContinueAsNewVersioningBehaviorUnspecified: + return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_UNSPECIFIED + case ContinueAsNewVersioningBehaviorAutoUpgrade: + return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE + default: + panic("unknown continue-as-new versioning behavior type") + } +} diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index fc6dc8810..3d15a2c59 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -353,7 +353,7 @@ func (e *TestWorkflowEnvironment) SetCurrentHistorySize(length int) { e.impl.setCurrentHistorySize(length) } -// SetContinueAsNewSuggested set sets the value that is returned from +// SetContinueAsNewSuggested sets the value that is returned from // GetInfo(ctx).GetContinueAsNewSuggested(). // // Note: this value may not be up to date if accessed inside a query. @@ -361,6 +361,22 @@ func (e *TestWorkflowEnvironment) SetContinueAsNewSuggested(suggest bool) { e.impl.setContinueAsNewSuggested(suggest) } +// SetContinueAsNewSuggestedReasons sets the value that is returned from +// GetInfo(ctx).GetContinueAsNewSuggestedReasons(). +// +// Note: this value may not be up to date if accessed inside a query. +func (e *TestWorkflowEnvironment) SetContinueAsNewSuggestedReasons(reasons []ContinueAsNewSuggestedReason) { + e.impl.setContinueAsNewSuggestedReasons(reasons) +} + +// SetTargetWorkerDeploymentVersionChanged sets the value that is returned from +// GetInfo(ctx).GetTargetWorkerDeploymentVersionChanged. +// +// Note: this value may not be up to date if accessed inside a query. +func (e *TestWorkflowEnvironment) SetTargetWorkerDeploymentVersionChanged(changed bool) { + e.impl.setTargetWorkerDeploymentVersionChanged(changed) +} + // SetContinuedExecutionRunID sets the value that is returned from // GetInfo(ctx).ContinuedExecutionRunID func (e *TestWorkflowEnvironment) SetContinuedExecutionRunID(rid string) { diff --git a/test/go.mod b/test/go.mod index a3b0c9920..0a4d3a929 100644 --- a/test/go.mod +++ b/test/go.mod @@ -15,7 +15,7 @@ require ( go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 - go.temporal.io/api v1.62.1 + go.temporal.io/api v1.62.2 go.temporal.io/sdk v1.29.1 go.temporal.io/sdk/contrib/opentelemetry v0.0.0-00010101000000-000000000000 go.temporal.io/sdk/contrib/opentracing v0.0.0-00010101000000-000000000000 diff --git a/test/go.sum b/test/go.sum index 48659a77d..416a9e617 100644 --- a/test/go.sum +++ b/test/go.sum @@ -170,8 +170,8 @@ go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2N go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= -go.temporal.io/api v1.62.1 h1:7UHMNOIqfYBVTaW0JIh/wDpw2jORkB6zUKsxGtvjSZU= -go.temporal.io/api v1.62.1/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= +go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/test/worker_deployment_test.go b/test/worker_deployment_test.go index 7e449afe3..ccf560244 100644 --- a/test/worker_deployment_test.go +++ b/test/worker_deployment_test.go @@ -64,7 +64,7 @@ func (ts *WorkerDeploymentTestSuite) waitForWorkerDeployment(ctx context.Context ts.Eventually(func() bool { _, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{}) return err == nil - }, 10*time.Second, 300*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) } func (ts *WorkerDeploymentTestSuite) waitForWorkerDeploymentVersion( @@ -83,7 +83,39 @@ func (ts *WorkerDeploymentTestSuite) waitForWorkerDeploymentVersion( } } return false - }, 10*time.Second, 300*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) +} + +func (ts *WorkerDeploymentTestSuite) waitForWorkerDeploymentRoutingConfigPropagation( + ctx context.Context, + deploymentName string, + expectedCurrentBuildID string, + expectedRampingBuildID string, +) { + ts.Eventually(func() bool { + resp, err := ts.client.WorkflowService().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{ + Namespace: ts.config.Namespace, + DeploymentName: deploymentName, + }) + if err != nil { + return false + } + if resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetCurrentDeploymentVersion().GetBuildId() != expectedCurrentBuildID { + return false + } + if resp.GetWorkerDeploymentInfo().GetRoutingConfig().GetRampingDeploymentVersion().GetBuildId() != expectedRampingBuildID { + return false + } + switch resp.GetWorkerDeploymentInfo().GetRoutingConfigUpdateState() { + case enumspb.ROUTING_CONFIG_UPDATE_STATE_COMPLETED: + return true + case enumspb.ROUTING_CONFIG_UPDATE_STATE_UNSPECIFIED: + return true // not implemented + case enumspb.ROUTING_CONFIG_UPDATE_STATE_IN_PROGRESS: + return false + } + return false + }, 5*time.Second, 100*time.Millisecond) } func (ts *WorkerDeploymentTestSuite) waitForWorkflowRunning(ctx context.Context, handle client.WorkflowRun) { @@ -92,7 +124,23 @@ func (ts *WorkerDeploymentTestSuite) waitForWorkflowRunning(ctx context.Context, ts.NoError(err) status := describeResp.WorkflowExecutionInfo.Status return enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING == status - }, 10*time.Second, 300*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) +} + +func (ts *WorkerDeploymentTestSuite) waitForWorkflowRunningOnVersion(ctx context.Context, handle client.WorkflowRun, expectedBuildID string) { + ts.Eventually(func() bool { + describeResp, err := ts.client.DescribeWorkflowExecution(ctx, handle.GetID(), handle.GetRunID()) + ts.NoError(err) + if status := describeResp.WorkflowExecutionInfo.Status; status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING { + ts.T().Logf("workflow status: %v", status) + return false + } + if describeResp.WorkflowExecutionInfo.GetVersioningInfo().GetDeploymentVersion().GetBuildId() != expectedBuildID { + ts.T().Logf("workflow version build id: %v", describeResp.WorkflowExecutionInfo.GetVersioningInfo().GetDeploymentVersion().GetBuildId()) + return false + } + return true + }, 5*time.Second, 100*time.Millisecond) } func (ts *WorkerDeploymentTestSuite) waitForDrainage(ctx context.Context, dHandle client.WorkerDeploymentHandle, buildID string, target client.WorkerDeploymentVersionDrainageStatus) { @@ -792,7 +840,7 @@ func (ts *WorkerDeploymentTestSuite) TestListDeployments() { } sort.Strings(res) return reflect.DeepEqual(res, []string{deploymentName1, deploymentName2}) - }, 10*time.Second, 300*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) } func (ts *WorkerDeploymentTestSuite) TestDeploymentDrainage() { @@ -1045,7 +1093,7 @@ func (ts *WorkerDeploymentTestSuite) TestRampVersions() { // very likely probability (1-2^33) of success ts.Eventually(func() bool { return !ts.runWorkflowAndCheckV1(ctx, uuid.NewString()) - }, 10*time.Second, 300*time.Millisecond) + }, 5*time.Second, 100*time.Millisecond) } func (ts *WorkerDeploymentTestSuite) TestRampVersion_AllowNoPollers() { @@ -1282,3 +1330,106 @@ func (ts *WorkerDeploymentTestSuite) TestDeleteDeployment() { return true }, 305*time.Second, 1000*time.Millisecond) } + +func (ts *WorkerDeploymentTestSuite) TestContinueAsNewWithVersionUpgrade() { + if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" { + ts.T().Skip("temporal server 1.27+ required") + } + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + deploymentName := "deploy-test-" + uuid.NewString() + v1 := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: "1.0", + } + v2 := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: "2.0", + } + + // Two workers: + // 1.0) and 2.0) both with no default versioning behavior + // SetCurrent to 1.0 + // Workflow (annotated as Pinned): + // - Start and poll for GetTargetWorkerDeploymentVersionChanged() + // - If target version changed, continue as new with AutoUpgrade behavior + // Verify workflow returns 2.0. + + // Define v1.0 of the workflow and start polling + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: v1, + }, + }) + worker1.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithVersionUpgradeV1, workflow.RegisterOptions{ + Name: "ContinueAsNewWithVersionUpgrade", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }) + ts.NoError(worker1.Start()) + defer worker1.Stop() + + // Define v2.0 of the workflow and start polling + worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: v2, + }, + }) + worker2.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithVersionUpgradeV2, workflow.RegisterOptions{ + Name: "ContinueAsNewWithVersionUpgrade", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }) + ts.NoError(worker2.Start()) + defer worker2.Stop() + + // Wait for the deployment to be ready + dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName) + ts.waitForWorkerDeployment(ctx, dHandle) + response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{}) + ts.NoError(err) + + // Wait for version 1.0 to be ready + ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1) + + // Set version 1.0 as current + response2, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: v1.BuildID, + ConflictToken: response1.ConflictToken, + }) + ts.NoError(err) + + // Wait for v1.0-as-Current Deployment Routing Config to be propagated to all task queues + ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v1.BuildID, "") + + // Start workflow + wfHandle, err := ts.client.ExecuteWorkflow( + ctx, + ts.startWorkflowOptions("test-continueasnew-with-version-upgrade"), + "ContinueAsNewWithVersionUpgrade", + 0, + ) + ts.NoError(err) + + // Wait for workflow to complete one WFT on v1.0 + ts.waitForWorkflowRunningOnVersion(ctx, wfHandle, v1.BuildID) + + // Wait for version 2.0 to be ready + ts.waitForWorkerDeploymentVersion(ctx, dHandle, v2) + + // Set version 2.0 as current + _, err = dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: v2.BuildID, + ConflictToken: response2.ConflictToken, + }) + ts.NoError(err) + + // Wait for v2.0-as-Current Deployment Routing Config to be propagated to all task queues + ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v2.BuildID, "") + + // Expect workflow to return "v2.0", indicating that it continued-as-new and completed on v2 + var result string + ts.NoError(wfHandle.Get(ctx, &result)) + ts.Equal("v2.0", result) +} diff --git a/test/workflow_test.go b/test/workflow_test.go index a0668b4dd..fdf0dc677 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -591,6 +591,50 @@ func (w *Workflows) ContinueAsNewWithRetryPolicy( ) } +func (w *Workflows) ContinueAsNewWithVersionUpgradeV1( + ctx workflow.Context, + attempt int, +) (string, error) { + if attempt > 0 { + return "v1.0", nil + } + + // Check GetTargetWorkerDeploymentVersionChanged periodically. + // TargetWorkerDeploymentVersionChanged is refreshed after each WFT completes. + for { + // Trigger a WFT when timer expires, thereby refreshing the TargetWorkerDeploymentVersionChanged flag. + // Since this is just a test workflow, we aren't doing any real work. In a real workflow regularly + // doing non-sleep workflow tasks, you would not need to artificially trigger a WFT to refresh the + // TargetWorkerDeploymentVersionChanged flag. You could choose to check the field periodically, or you + // might want to check before accepting updates, starting activities, or starting child workflows. + err := workflow.Sleep(ctx, 10*time.Millisecond) + if err != nil { + return "", err + } + info := workflow.GetInfo(ctx) + if info.GetTargetWorkerDeploymentVersionChanged() { + return "", workflow.NewContinueAsNewErrorWithOptions( + ctx, + workflow.ContinueAsNewErrorOptions{ + // Pass InitialVersioningBehavior=workflow.ContinueAsNewVersioningBehaviorAutoUpgrade + // to make the new run start with AutoUpgrade behavior and use the Target Version of + // its Worker Deployment. + InitialVersioningBehavior: workflow.ContinueAsNewVersioningBehaviorAutoUpgrade, + }, + "ContinueAsNewWithVersionUpgrade", + attempt+1, + ) + } + } +} + +func (w *Workflows) ContinueAsNewWithVersionUpgradeV2( + ctx workflow.Context, + attempt int, +) (string, error) { + return "v2.0", nil +} + func (w *Workflows) ContinueAsNewWithChildWF( ctx workflow.Context, iterations int, diff --git a/workflow/workflow.go b/workflow/workflow.go index d602faa5a..215681161 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -26,6 +26,47 @@ const ( VersioningBehaviorAutoUpgrade = internal.VersioningBehaviorAutoUpgrade ) +// ContinueAsNewVersioningBehavior specifies how the new workflow run after ContinueAsNew should change its Build ID. +// +// NOTE: Upgrade-on-Continue-as-New is currently experimental. +type ContinueAsNewVersioningBehavior = internal.ContinueAsNewVersioningBehavior + +const ( + // ContinueAsNewVersioningBehaviorUnspecified - Workflow versioning policy unknown. + // If the source workflow was AutoUpgrade, the new workflow will start as AutoUpgrade. + // If the source workflow was Pinned, the new workflow will start Pinned to the same Build ID. + // If the source workflow had a Pinned Versioning Override, the new workflow will inherit that Versioning Override. + ContinueAsNewVersioningBehaviorUnspecified = internal.ContinueAsNewVersioningBehaviorUnspecified + + // ContinueAsNewVersioningBehaviorAutoUpgrade - Start the new workflow with AutoUpgrade versioning behavior. + // Like all AutoUpgrade workflows, use the Target Version of the workflow's task queue at start-time. After the + // first workflow task completes, use whatever Versioning Behavior the workflow is annotated with in the workflow + // code. + // + // Note that if the previous workflow had a Pinned override, that override will be inherited by the new workflow + // run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new command. + ContinueAsNewVersioningBehaviorAutoUpgrade = internal.ContinueAsNewVersioningBehaviorAutoUpgrade +) + +// ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time. +// +// NOTE: ContinueAsNewSuggestedReasons are currently experimental. +type ContinueAsNewSuggestedReason = internal.ContinueAsNewSuggestedReason + +const ( + // ContinueAsNewSuggestedReasonUnspecified - The reason is unknown. + ContinueAsNewSuggestedReasonUnspecified = internal.ContinueAsNewSuggestedReasonUnspecified + + // ContinueAsNewSuggestedReasonHistorySizeTooLarge - Workflow History size is getting too large. + ContinueAsNewSuggestedReasonHistorySizeTooLarge = internal.ContinueAsNewSuggestedReasonHistorySizeTooLarge + + // ContinueAsNewSuggestedReasonTooManyHistoryEvents - Workflow History size is getting too large. + ContinueAsNewSuggestedReasonTooManyHistoryEvents = internal.ContinueAsNewSuggestedReasonTooManyHistoryEvents + + // ContinueAsNewSuggestedReasonTooManyUpdates - Workflow's count of completed plus in-flight updates is too large. + ContinueAsNewSuggestedReasonTooManyUpdates = internal.ContinueAsNewSuggestedReasonTooManyUpdates +) + // HandlerUnfinishedPolicy defines the actions taken when a workflow exits while update handlers are // running. The workflow exit may be due to successful return, failure, cancellation, or // continue-as-new.