From 32efcc87cfbdf8b256bdcd2c36f02425e20c5081 Mon Sep 17 00:00:00 2001 From: wangbill Date: Sat, 21 Mar 2026 00:08:44 -0700 Subject: [PATCH 1/6] Add ContinueAsNewOptions with NewVersion support for orchestration version migration --- src/Abstractions/ContinueAsNewOptions.cs | 21 ++++ src/Abstractions/TaskOrchestrationContext.cs | 31 +++++- src/Abstractions/TaskOrchestrator.cs | 2 +- .../Shims/TaskOrchestrationContextWrapper.cs | 21 +++- .../OrchestrationPatterns.cs | 30 ++++++ .../TaskOrchestrationContextWrapperTests.cs | 97 +++++++++++++++++++ 6 files changed, 194 insertions(+), 8 deletions(-) create mode 100644 src/Abstractions/ContinueAsNewOptions.cs diff --git a/src/Abstractions/ContinueAsNewOptions.cs b/src/Abstractions/ContinueAsNewOptions.cs new file mode 100644 index 000000000..9e5689e4f --- /dev/null +++ b/src/Abstractions/ContinueAsNewOptions.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask; + +/// +/// Options for . +/// +public class ContinueAsNewOptions +{ + /// + /// Gets or sets the new version for the restarted orchestration instance. + /// + /// + /// When set, the framework uses this version to route the restarted instance to the + /// appropriate orchestrator implementation. This is the safest migration point for + /// eternal orchestrations since the history is fully reset, eliminating any replay + /// conflict risk. + /// + public string? NewVersion { get; set; } +} diff --git a/src/Abstractions/TaskOrchestrationContext.cs b/src/Abstractions/TaskOrchestrationContext.cs index 49a1e6b09..857006983 100644 --- a/src/Abstractions/TaskOrchestrationContext.cs +++ b/src/Abstractions/TaskOrchestrationContext.cs @@ -395,16 +395,16 @@ public virtual Task CallSubOrchestratorAsync( /// replays when rebuilding state. /// /// The results of any incomplete tasks will be discarded when an orchestrator calls - /// . For example, if a timer is scheduled and then + /// . For example, if a timer is scheduled and then /// is called before the timer fires, the timer event will be discarded. The only exception to this /// is external events. By default, if an external event is received by an orchestration but not yet /// processed, the event is saved in the orchestration state unit it is received by a call to /// . These events will continue to remain in memory - /// even after an orchestrator restarts using . You can disable this behavior and + /// even after an orchestrator restarts using . You can disable this behavior and /// remove any saved external events by specifying false for the /// parameter value. /// - /// Orchestrator implementations should complete immediately after calling the method. + /// Orchestrator implementations should complete immediately after calling the method. /// /// /// The JSON-serializable input data to re-initialize the instance with. @@ -415,6 +415,31 @@ public virtual Task CallSubOrchestratorAsync( /// public abstract void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true); + /// + /// Restarts the orchestration with a new version, clearing the history. + /// + /// + /// + /// This overload allows specifying a new version for the restarted orchestration, enabling + /// version-based dispatch. The new version is used by the framework to route the restarted + /// instance to the appropriate orchestrator implementation. This is the safest migration point + /// for eternal orchestrations since the history is fully reset. + /// + /// Orchestrator implementations should complete immediately after calling this method. + /// + /// + /// Options for the continue-as-new operation, including the new version. + /// The JSON-serializable input data to re-initialize the instance with. + /// + /// If set to true, re-adds any unprocessed external events into the new execution + /// history when the orchestration instance restarts. If false, any unprocessed + /// external events will be discarded when the orchestration instance restarts. + /// + public virtual void ContinueAsNew(ContinueAsNewOptions options, object? newInput = null, bool preserveUnprocessedEvents = true) + { + this.ContinueAsNew(newInput, preserveUnprocessedEvents); + } + /// /// Creates a new GUID that is safe for replay within an orchestration or operation. /// diff --git a/src/Abstractions/TaskOrchestrator.cs b/src/Abstractions/TaskOrchestrator.cs index 07dca1e52..99b014182 100644 --- a/src/Abstractions/TaskOrchestrator.cs +++ b/src/Abstractions/TaskOrchestrator.cs @@ -78,7 +78,7 @@ public interface ITaskOrchestrator /// /// /// Avoid infinite loops as they could cause the application to run out of memory. Instead, ensure that loops are -/// bounded or use to restart an orchestrator with a new +/// bounded or use to restart an orchestrator with a new /// input. /// /// diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 98ba27211..c5c008958 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -226,7 +226,7 @@ public override async Task CallSubOrchestratorAsync( version, instanceId, policy.ToDurableTaskCoreRetryOptions(), - input, + input, options.Tags); } else if (options?.Retry?.Handler is AsyncRetryHandler handler) @@ -236,7 +236,7 @@ public override async Task CallSubOrchestratorAsync( orchestratorName.Name, version, instanceId, - input, + input, options?.Tags), orchestratorName.Name, handler, @@ -248,7 +248,7 @@ public override async Task CallSubOrchestratorAsync( orchestratorName.Name, version, instanceId, - input, + input, options?.Tags); } } @@ -337,7 +337,20 @@ public override void SetCustomStatus(object? customStatus) /// public override void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true) { - this.innerContext.ContinueAsNew(newInput); + this.ContinueAsNew(options: null, newInput, preserveUnprocessedEvents); + } + + /// + public override void ContinueAsNew(ContinueAsNewOptions? options, object? newInput = null, bool preserveUnprocessedEvents = true) + { + if (options?.NewVersion is not null) + { + this.innerContext.ContinueAsNew(options.NewVersion, newInput); + } + else + { + this.innerContext.ContinueAsNew(newInput); + } if (preserveUnprocessedEvents) { diff --git a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs index 1114029bd..9bb8795fa 100644 --- a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs +++ b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs @@ -580,6 +580,36 @@ public async Task ContinueAsNew() Assert.Equal(10, metadata.ReadOutputAs()); } + [Fact] + public async Task ContinueAsNewWithNewVersion() + { + TaskName orchestratorName = nameof(ContinueAsNewWithNewVersion); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async (ctx, input) => + { + if (input == 0) + { + // First generation: migrate to "v2" + await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None); + ctx.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2" }, input + 1); + return string.Empty; + } + + // Second generation: return the version to verify it changed + return ctx.Version; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: 0); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("v2", metadata.ReadOutputAs()); + } + [Fact] public async Task SubOrchestration() { diff --git a/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs b/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs index 54e8bfb63..5aa5c2db8 100644 --- a/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs +++ b/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs @@ -46,6 +46,103 @@ static void VerifyWrapper( wrapper.GetInput().Should().Be(input); } + [Fact] + public void ContinueAsNew_WithoutVersion_CallsInnerContextWithoutVersion() + { + // Arrange + TrackingOrchestrationContext innerContext = new(); + OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null); + TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input"); + + // Act + wrapper.ContinueAsNew("new-input", preserveUnprocessedEvents: false); + + // Assert + innerContext.LastContinueAsNewInput.Should().Be("new-input"); + innerContext.LastContinueAsNewVersion.Should().BeNull(); + } + + [Fact] + public void ContinueAsNew_WithVersion_CallsInnerContextWithVersion() + { + // Arrange + TrackingOrchestrationContext innerContext = new(); + OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null); + TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input"); + + // Act + wrapper.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2" }, "new-input", preserveUnprocessedEvents: false); + + // Assert + innerContext.LastContinueAsNewInput.Should().Be("new-input"); + innerContext.LastContinueAsNewVersion.Should().Be("v2"); + } + + [Fact] + public void ContinueAsNew_WithNullOptions_CallsInnerContextWithoutVersion() + { + // Arrange + TrackingOrchestrationContext innerContext = new(); + OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null); + TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input"); + + // Act + wrapper.ContinueAsNew(options: null, newInput: "new-input", preserveUnprocessedEvents: false); + + // Assert + innerContext.LastContinueAsNewInput.Should().Be("new-input"); + innerContext.LastContinueAsNewVersion.Should().BeNull(); + } + + class TrackingOrchestrationContext : OrchestrationContext + { + public TrackingOrchestrationContext() + { + this.OrchestrationInstance = new() + { + InstanceId = Guid.NewGuid().ToString(), + ExecutionId = Guid.NewGuid().ToString(), + }; + } + + public object? LastContinueAsNewInput { get; private set; } + + public string? LastContinueAsNewVersion { get; private set; } + + public override void ContinueAsNew(object input) + { + this.LastContinueAsNewInput = input; + this.LastContinueAsNewVersion = null; + } + + public override void ContinueAsNew(string newVersion, object input) + { + this.LastContinueAsNewInput = input; + this.LastContinueAsNewVersion = newVersion; + } + + public override Task CreateSubOrchestrationInstance(string name, string version, object input) + => throw new NotImplementedException(); + + public override Task CreateSubOrchestrationInstance(string name, string version, string instanceId, object input) + => throw new NotImplementedException(); + + public override Task CreateSubOrchestrationInstance(string name, string version, string instanceId, object input, IDictionary tags) + => throw new NotImplementedException(); + + public override Task CreateTimer(DateTime fireAt, T state) + => throw new NotImplementedException(); + + public override Task CreateTimer(DateTime fireAt, T state, CancellationToken cancelToken) + => throw new NotImplementedException(); + + public override Task ScheduleTask(string name, string version, params object[] parameters) + => throw new NotImplementedException(); + + public override void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData) + => throw new NotImplementedException(); + } + class TestOrchestrationContext : OrchestrationContext { public TestOrchestrationContext() From a8a98e9e0d13de180e3c2ddbfe27769a1cbacae6 Mon Sep 17 00:00:00 2001 From: wangbill Date: Sat, 21 Mar 2026 00:24:12 -0700 Subject: [PATCH 2/6] Address PR review: fix nullability, remove default, fix typo, seal test class --- src/Abstractions/TaskOrchestrationContext.cs | 6 +++--- src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs | 2 +- .../Shims/TaskOrchestrationContextWrapperTests.cs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Abstractions/TaskOrchestrationContext.cs b/src/Abstractions/TaskOrchestrationContext.cs index 857006983..2fd3a14f5 100644 --- a/src/Abstractions/TaskOrchestrationContext.cs +++ b/src/Abstractions/TaskOrchestrationContext.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using Microsoft.DurableTask.Abstractions; @@ -398,7 +398,7 @@ public virtual Task CallSubOrchestratorAsync( /// . For example, if a timer is scheduled and then /// is called before the timer fires, the timer event will be discarded. The only exception to this /// is external events. By default, if an external event is received by an orchestration but not yet - /// processed, the event is saved in the orchestration state unit it is received by a call to + /// processed, the event is saved in the orchestration state until it is received by a call to /// . These events will continue to remain in memory /// even after an orchestrator restarts using . You can disable this behavior and /// remove any saved external events by specifying false for the @@ -435,7 +435,7 @@ public virtual Task CallSubOrchestratorAsync( /// history when the orchestration instance restarts. If false, any unprocessed /// external events will be discarded when the orchestration instance restarts. /// - public virtual void ContinueAsNew(ContinueAsNewOptions options, object? newInput = null, bool preserveUnprocessedEvents = true) + public virtual void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents = true) { this.ContinueAsNew(newInput, preserveUnprocessedEvents); } diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index c5c008958..c9c02032a 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -341,7 +341,7 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce } /// - public override void ContinueAsNew(ContinueAsNewOptions? options, object? newInput = null, bool preserveUnprocessedEvents = true) + public override void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents = true) { if (options?.NewVersion is not null) { diff --git a/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs b/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs index 5aa5c2db8..d164454dd 100644 --- a/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs +++ b/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs @@ -94,7 +94,7 @@ public void ContinueAsNew_WithNullOptions_CallsInnerContextWithoutVersion() innerContext.LastContinueAsNewVersion.Should().BeNull(); } - class TrackingOrchestrationContext : OrchestrationContext + sealed class TrackingOrchestrationContext : OrchestrationContext { public TrackingOrchestrationContext() { From c344e348c8d3fbbb30fa2cc8e1731a4426ec7812 Mon Sep 17 00:00:00 2001 From: wangbill Date: Sat, 21 Mar 2026 00:37:55 -0700 Subject: [PATCH 3/6] Address round 2: guard empty version, update XML docs --- src/Abstractions/TaskOrchestrationContext.cs | 18 ++++++++++++------ .../Shims/TaskOrchestrationContextWrapper.cs | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/Abstractions/TaskOrchestrationContext.cs b/src/Abstractions/TaskOrchestrationContext.cs index 2fd3a14f5..e83e01007 100644 --- a/src/Abstractions/TaskOrchestrationContext.cs +++ b/src/Abstractions/TaskOrchestrationContext.cs @@ -416,19 +416,25 @@ public virtual Task CallSubOrchestratorAsync( public abstract void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true); /// - /// Restarts the orchestration with a new version, clearing the history. + /// Restarts the orchestration, optionally with a new version, clearing the history. /// /// /// - /// This overload allows specifying a new version for the restarted orchestration, enabling - /// version-based dispatch. The new version is used by the framework to route the restarted - /// instance to the appropriate orchestrator implementation. This is the safest migration point - /// for eternal orchestrations since the history is fully reset. + /// This overload accepts to control the restart behavior. + /// When is set, the framework uses the new version + /// to route the restarted instance to the appropriate orchestrator implementation, enabling + /// version-based dispatch. When no version is specified (i.e., is + /// null or NewVersion is not set), this method behaves identically to + /// . + /// + /// The default implementation ignores and delegates to + /// . Subclasses that support version-based + /// dispatch should override this method. /// /// Orchestrator implementations should complete immediately after calling this method. /// /// - /// Options for the continue-as-new operation, including the new version. + /// Options for the continue-as-new operation, including an optional new version. /// The JSON-serializable input data to re-initialize the instance with. /// /// If set to true, re-adds any unprocessed external events into the new execution diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index c9c02032a..3351dad4b 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -343,7 +343,7 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce /// public override void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents = true) { - if (options?.NewVersion is not null) + if (!string.IsNullOrEmpty(options?.NewVersion)) { this.innerContext.ContinueAsNew(options.NewVersion, newInput); } From d5f10a7c10213ae7da1d9c4b4f156a814568c946 Mon Sep 17 00:00:00 2001 From: wangbill Date: Sat, 21 Mar 2026 09:56:01 -0700 Subject: [PATCH 4/6] Update src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 3351dad4b..fdd8e5e44 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -343,7 +343,7 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce /// public override void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents = true) { - if (!string.IsNullOrEmpty(options?.NewVersion)) + if (!string.IsNullOrWhiteSpace(options?.NewVersion)) { this.innerContext.ContinueAsNew(options.NewVersion, newInput); } From 916e1e3b13c6a9aa533b396b61278486171d543d Mon Sep 17 00:00:00 2001 From: wangbill Date: Sat, 21 Mar 2026 10:53:33 -0700 Subject: [PATCH 5/6] Remove preserveUnprocessedEvents default to eliminate overload ambiguity with ContinueAsNew(null, false) --- src/Abstractions/TaskOrchestrationContext.cs | 2 +- src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs | 2 +- test/Grpc.IntegrationTests/OrchestrationPatterns.cs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Abstractions/TaskOrchestrationContext.cs b/src/Abstractions/TaskOrchestrationContext.cs index e83e01007..c49906c40 100644 --- a/src/Abstractions/TaskOrchestrationContext.cs +++ b/src/Abstractions/TaskOrchestrationContext.cs @@ -441,7 +441,7 @@ public virtual Task CallSubOrchestratorAsync( /// history when the orchestration instance restarts. If false, any unprocessed /// external events will be discarded when the orchestration instance restarts. /// - public virtual void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents = true) + public virtual void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents) { this.ContinueAsNew(newInput, preserveUnprocessedEvents); } diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index fdd8e5e44..45ff64813 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -341,7 +341,7 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce } /// - public override void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents = true) + public override void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents) { if (!string.IsNullOrWhiteSpace(options?.NewVersion)) { diff --git a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs index 9bb8795fa..f8e1e1fef 100644 --- a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs +++ b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs @@ -593,7 +593,7 @@ public async Task ContinueAsNewWithNewVersion() { // First generation: migrate to "v2" await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None); - ctx.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2" }, input + 1); + ctx.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2" }, input + 1, true); return string.Empty; } From 2a809a6720516d7e0c6e0afa192dfa0423433bc7 Mon Sep 17 00:00:00 2001 From: wangbill Date: Mon, 23 Mar 2026 15:59:35 -0700 Subject: [PATCH 6/6] Fold newInput and preserveUnprocessedEvents into ContinueAsNewOptions Address review feedback from cgillum: fold newInput and preserveUnprocessedEvents into ContinueAsNewOptions so the new overload takes a single ContinueAsNewOptions parameter. - Add NewInput and PreserveUnprocessedEvents properties to ContinueAsNewOptions - Change ContinueAsNew(ContinueAsNewOptions?, object?, bool) to ContinueAsNew(ContinueAsNewOptions) - Update TaskOrchestrationContextWrapper to match - Update unit and integration tests --- src/Abstractions/ContinueAsNewOptions.cs | 18 ++++++++++++- src/Abstractions/TaskOrchestrationContext.cs | 27 ++++++++----------- .../Shims/TaskOrchestrationContextWrapper.cs | 18 ++++++++----- .../OrchestrationPatterns.cs | 2 +- .../TaskOrchestrationContextWrapperTests.cs | 15 ++++++++--- 5 files changed, 53 insertions(+), 27 deletions(-) diff --git a/src/Abstractions/ContinueAsNewOptions.cs b/src/Abstractions/ContinueAsNewOptions.cs index 9e5689e4f..95fb85788 100644 --- a/src/Abstractions/ContinueAsNewOptions.cs +++ b/src/Abstractions/ContinueAsNewOptions.cs @@ -4,10 +4,26 @@ namespace Microsoft.DurableTask; /// -/// Options for . +/// Options for . /// public class ContinueAsNewOptions { + /// + /// Gets or sets the JSON-serializable input data to re-initialize the instance with. + /// + public object? NewInput { get; set; } + + /// + /// Gets or sets a value indicating whether to preserve unprocessed external events + /// across the restart. Defaults to true. + /// + /// + /// When set to true, any unprocessed external events are re-added into the new execution + /// history when the orchestration instance restarts. When false, any unprocessed + /// external events will be discarded when the orchestration instance restarts. + /// + public bool PreserveUnprocessedEvents { get; set; } = true; + /// /// Gets or sets the new version for the restarted orchestration instance. /// diff --git a/src/Abstractions/TaskOrchestrationContext.cs b/src/Abstractions/TaskOrchestrationContext.cs index c49906c40..6f6fa793a 100644 --- a/src/Abstractions/TaskOrchestrationContext.cs +++ b/src/Abstractions/TaskOrchestrationContext.cs @@ -416,34 +416,29 @@ public virtual Task CallSubOrchestratorAsync( public abstract void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true); /// - /// Restarts the orchestration, optionally with a new version, clearing the history. + /// Restarts the orchestration with the specified options, clearing the history. /// /// /// - /// This overload accepts to control the restart behavior. + /// This overload accepts to control the restart behavior, + /// including the new input, whether to preserve unprocessed events, and an optional new version. /// When is set, the framework uses the new version /// to route the restarted instance to the appropriate orchestrator implementation, enabling - /// version-based dispatch. When no version is specified (i.e., is - /// null or NewVersion is not set), this method behaves identically to - /// . + /// version-based dispatch. /// - /// The default implementation ignores and delegates to - /// . Subclasses that support version-based + /// The default implementation delegates to + /// using the input and preserve-events values + /// from . Subclasses that support version-based /// dispatch should override this method. /// /// Orchestrator implementations should complete immediately after calling this method. /// /// - /// Options for the continue-as-new operation, including an optional new version. - /// The JSON-serializable input data to re-initialize the instance with. - /// - /// If set to true, re-adds any unprocessed external events into the new execution - /// history when the orchestration instance restarts. If false, any unprocessed - /// external events will be discarded when the orchestration instance restarts. - /// - public virtual void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents) + /// Options for the continue-as-new operation. + public virtual void ContinueAsNew(ContinueAsNewOptions options) { - this.ContinueAsNew(newInput, preserveUnprocessedEvents); + Check.NotNull(options); + this.ContinueAsNew(options.NewInput, options.PreserveUnprocessedEvents); } /// diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 45ff64813..bfbf1e47f 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -337,22 +337,28 @@ public override void SetCustomStatus(object? customStatus) /// public override void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true) { - this.ContinueAsNew(options: null, newInput, preserveUnprocessedEvents); + this.ContinueAsNew(new ContinueAsNewOptions + { + NewInput = newInput, + PreserveUnprocessedEvents = preserveUnprocessedEvents, + }); } /// - public override void ContinueAsNew(ContinueAsNewOptions? options, object? newInput, bool preserveUnprocessedEvents) + public override void ContinueAsNew(ContinueAsNewOptions options) { - if (!string.IsNullOrWhiteSpace(options?.NewVersion)) + Check.NotNull(options); + + if (!string.IsNullOrWhiteSpace(options.NewVersion)) { - this.innerContext.ContinueAsNew(options.NewVersion, newInput); + this.innerContext.ContinueAsNew(options.NewVersion, options.NewInput); } else { - this.innerContext.ContinueAsNew(newInput); + this.innerContext.ContinueAsNew(options.NewInput); } - if (preserveUnprocessedEvents) + if (options.PreserveUnprocessedEvents) { // Send all the buffered external events to ourself. OrchestrationInstance instance = new() { InstanceId = this.InstanceId }; diff --git a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs index f8e1e1fef..09acb3650 100644 --- a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs +++ b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs @@ -593,7 +593,7 @@ public async Task ContinueAsNewWithNewVersion() { // First generation: migrate to "v2" await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None); - ctx.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2" }, input + 1, true); + ctx.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2", NewInput = input + 1 }); return string.Empty; } diff --git a/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs b/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs index d164454dd..06df43170 100644 --- a/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs +++ b/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs @@ -71,7 +71,12 @@ public void ContinueAsNew_WithVersion_CallsInnerContextWithVersion() TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input"); // Act - wrapper.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2" }, "new-input", preserveUnprocessedEvents: false); + wrapper.ContinueAsNew(new ContinueAsNewOptions + { + NewVersion = "v2", + NewInput = "new-input", + PreserveUnprocessedEvents = false, + }); // Assert innerContext.LastContinueAsNewInput.Should().Be("new-input"); @@ -79,7 +84,7 @@ public void ContinueAsNew_WithVersion_CallsInnerContextWithVersion() } [Fact] - public void ContinueAsNew_WithNullOptions_CallsInnerContextWithoutVersion() + public void ContinueAsNew_WithOptionsNoVersion_CallsInnerContextWithoutVersion() { // Arrange TrackingOrchestrationContext innerContext = new(); @@ -87,7 +92,11 @@ public void ContinueAsNew_WithNullOptions_CallsInnerContextWithoutVersion() TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input"); // Act - wrapper.ContinueAsNew(options: null, newInput: "new-input", preserveUnprocessedEvents: false); + wrapper.ContinueAsNew(new ContinueAsNewOptions + { + NewInput = "new-input", + PreserveUnprocessedEvents = false, + }); // Assert innerContext.LastContinueAsNewInput.Should().Be("new-input");