diff --git a/src/Abstractions/ContinueAsNewOptions.cs b/src/Abstractions/ContinueAsNewOptions.cs new file mode 100644 index 000000000..95fb85788 --- /dev/null +++ b/src/Abstractions/ContinueAsNewOptions.cs @@ -0,0 +1,37 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask; + +/// +/// Options for . +/// +public class ContinueAsNewOptions +{ + /// + /// Gets or sets the JSON-serializable input data to re-initialize the instance with. + /// + public object? NewInput { get; set; } + + /// + /// Gets or sets a value indicating whether to preserve unprocessed external events + /// across the restart. Defaults to true. + /// + /// + /// When set to true, any unprocessed external events are re-added into the new execution + /// history when the orchestration instance restarts. When false, any unprocessed + /// external events will be discarded when the orchestration instance restarts. + /// + public bool PreserveUnprocessedEvents { get; set; } = true; + + /// + /// Gets or sets the new version for the restarted orchestration instance. + /// + /// + /// When set, the framework uses this version to route the restarted instance to the + /// appropriate orchestrator implementation. This is the safest migration point for + /// eternal orchestrations since the history is fully reset, eliminating any replay + /// conflict risk. + /// + public string? NewVersion { get; set; } +} diff --git a/src/Abstractions/TaskOrchestrationContext.cs b/src/Abstractions/TaskOrchestrationContext.cs index 49a1e6b09..6f6fa793a 100644 --- a/src/Abstractions/TaskOrchestrationContext.cs +++ b/src/Abstractions/TaskOrchestrationContext.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using Microsoft.DurableTask.Abstractions; @@ -395,16 +395,16 @@ public virtual Task CallSubOrchestratorAsync( /// replays when rebuilding state. /// /// The results of any incomplete tasks will be discarded when an orchestrator calls - /// . For example, if a timer is scheduled and then + /// . For example, if a timer is scheduled and then /// is called before the timer fires, the timer event will be discarded. The only exception to this /// is external events. By default, if an external event is received by an orchestration but not yet - /// processed, the event is saved in the orchestration state unit it is received by a call to + /// processed, the event is saved in the orchestration state until it is received by a call to /// . These events will continue to remain in memory - /// even after an orchestrator restarts using . You can disable this behavior and + /// even after an orchestrator restarts using . You can disable this behavior and /// remove any saved external events by specifying false for the /// parameter value. /// - /// Orchestrator implementations should complete immediately after calling the method. + /// Orchestrator implementations should complete immediately after calling the method. /// /// /// The JSON-serializable input data to re-initialize the instance with. @@ -415,6 +415,32 @@ public virtual Task CallSubOrchestratorAsync( /// public abstract void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true); + /// + /// Restarts the orchestration with the specified options, clearing the history. + /// + /// + /// + /// This overload accepts to control the restart behavior, + /// including the new input, whether to preserve unprocessed events, and an optional new version. + /// When is set, the framework uses the new version + /// to route the restarted instance to the appropriate orchestrator implementation, enabling + /// version-based dispatch. + /// + /// The default implementation delegates to + /// using the input and preserve-events values + /// from . Subclasses that support version-based + /// dispatch should override this method. + /// + /// Orchestrator implementations should complete immediately after calling this method. + /// + /// + /// Options for the continue-as-new operation. + public virtual void ContinueAsNew(ContinueAsNewOptions options) + { + Check.NotNull(options); + this.ContinueAsNew(options.NewInput, options.PreserveUnprocessedEvents); + } + /// /// Creates a new GUID that is safe for replay within an orchestration or operation. /// diff --git a/src/Abstractions/TaskOrchestrator.cs b/src/Abstractions/TaskOrchestrator.cs index 07dca1e52..99b014182 100644 --- a/src/Abstractions/TaskOrchestrator.cs +++ b/src/Abstractions/TaskOrchestrator.cs @@ -78,7 +78,7 @@ public interface ITaskOrchestrator /// /// /// Avoid infinite loops as they could cause the application to run out of memory. Instead, ensure that loops are -/// bounded or use to restart an orchestrator with a new +/// bounded or use to restart an orchestrator with a new /// input. /// /// diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 98ba27211..bfbf1e47f 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -226,7 +226,7 @@ public override async Task CallSubOrchestratorAsync( version, instanceId, policy.ToDurableTaskCoreRetryOptions(), - input, + input, options.Tags); } else if (options?.Retry?.Handler is AsyncRetryHandler handler) @@ -236,7 +236,7 @@ public override async Task CallSubOrchestratorAsync( orchestratorName.Name, version, instanceId, - input, + input, options?.Tags), orchestratorName.Name, handler, @@ -248,7 +248,7 @@ public override async Task CallSubOrchestratorAsync( orchestratorName.Name, version, instanceId, - input, + input, options?.Tags); } } @@ -337,9 +337,28 @@ public override void SetCustomStatus(object? customStatus) /// public override void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true) { - this.innerContext.ContinueAsNew(newInput); + this.ContinueAsNew(new ContinueAsNewOptions + { + NewInput = newInput, + PreserveUnprocessedEvents = preserveUnprocessedEvents, + }); + } + + /// + public override void ContinueAsNew(ContinueAsNewOptions options) + { + Check.NotNull(options); + + if (!string.IsNullOrWhiteSpace(options.NewVersion)) + { + this.innerContext.ContinueAsNew(options.NewVersion, options.NewInput); + } + else + { + this.innerContext.ContinueAsNew(options.NewInput); + } - if (preserveUnprocessedEvents) + if (options.PreserveUnprocessedEvents) { // Send all the buffered external events to ourself. OrchestrationInstance instance = new() { InstanceId = this.InstanceId }; diff --git a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs index 1114029bd..09acb3650 100644 --- a/test/Grpc.IntegrationTests/OrchestrationPatterns.cs +++ b/test/Grpc.IntegrationTests/OrchestrationPatterns.cs @@ -580,6 +580,36 @@ public async Task ContinueAsNew() Assert.Equal(10, metadata.ReadOutputAs()); } + [Fact] + public async Task ContinueAsNewWithNewVersion() + { + TaskName orchestratorName = nameof(ContinueAsNewWithNewVersion); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async (ctx, input) => + { + if (input == 0) + { + // First generation: migrate to "v2" + await ctx.CreateTimer(TimeSpan.Zero, CancellationToken.None); + ctx.ContinueAsNew(new ContinueAsNewOptions { NewVersion = "v2", NewInput = input + 1 }); + return string.Empty; + } + + // Second generation: return the version to verify it changed + return ctx.Version; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input: 0); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("v2", metadata.ReadOutputAs()); + } + [Fact] public async Task SubOrchestration() { diff --git a/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs b/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs index 54e8bfb63..06df43170 100644 --- a/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs +++ b/test/Worker/Core.Tests/Shims/TaskOrchestrationContextWrapperTests.cs @@ -46,6 +46,112 @@ static void VerifyWrapper( wrapper.GetInput().Should().Be(input); } + [Fact] + public void ContinueAsNew_WithoutVersion_CallsInnerContextWithoutVersion() + { + // Arrange + TrackingOrchestrationContext innerContext = new(); + OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null); + TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input"); + + // Act + wrapper.ContinueAsNew("new-input", preserveUnprocessedEvents: false); + + // Assert + innerContext.LastContinueAsNewInput.Should().Be("new-input"); + innerContext.LastContinueAsNewVersion.Should().BeNull(); + } + + [Fact] + public void ContinueAsNew_WithVersion_CallsInnerContextWithVersion() + { + // Arrange + TrackingOrchestrationContext innerContext = new(); + OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null); + TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input"); + + // Act + wrapper.ContinueAsNew(new ContinueAsNewOptions + { + NewVersion = "v2", + NewInput = "new-input", + PreserveUnprocessedEvents = false, + }); + + // Assert + innerContext.LastContinueAsNewInput.Should().Be("new-input"); + innerContext.LastContinueAsNewVersion.Should().Be("v2"); + } + + [Fact] + public void ContinueAsNew_WithOptionsNoVersion_CallsInnerContextWithoutVersion() + { + // Arrange + TrackingOrchestrationContext innerContext = new(); + OrchestrationInvocationContext invocationContext = new("Test", new(), NullLoggerFactory.Instance, null); + TaskOrchestrationContextWrapper wrapper = new(innerContext, invocationContext, "input"); + + // Act + wrapper.ContinueAsNew(new ContinueAsNewOptions + { + NewInput = "new-input", + PreserveUnprocessedEvents = false, + }); + + // Assert + innerContext.LastContinueAsNewInput.Should().Be("new-input"); + innerContext.LastContinueAsNewVersion.Should().BeNull(); + } + + sealed class TrackingOrchestrationContext : OrchestrationContext + { + public TrackingOrchestrationContext() + { + this.OrchestrationInstance = new() + { + InstanceId = Guid.NewGuid().ToString(), + ExecutionId = Guid.NewGuid().ToString(), + }; + } + + public object? LastContinueAsNewInput { get; private set; } + + public string? LastContinueAsNewVersion { get; private set; } + + public override void ContinueAsNew(object input) + { + this.LastContinueAsNewInput = input; + this.LastContinueAsNewVersion = null; + } + + public override void ContinueAsNew(string newVersion, object input) + { + this.LastContinueAsNewInput = input; + this.LastContinueAsNewVersion = newVersion; + } + + public override Task CreateSubOrchestrationInstance(string name, string version, object input) + => throw new NotImplementedException(); + + public override Task CreateSubOrchestrationInstance(string name, string version, string instanceId, object input) + => throw new NotImplementedException(); + + public override Task CreateSubOrchestrationInstance(string name, string version, string instanceId, object input, IDictionary tags) + => throw new NotImplementedException(); + + public override Task CreateTimer(DateTime fireAt, T state) + => throw new NotImplementedException(); + + public override Task CreateTimer(DateTime fireAt, T state, CancellationToken cancelToken) + => throw new NotImplementedException(); + + public override Task ScheduleTask(string name, string version, params object[] parameters) + => throw new NotImplementedException(); + + public override void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData) + => throw new NotImplementedException(); + } + class TestOrchestrationContext : OrchestrationContext { public TestOrchestrationContext()