Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,26 @@ def resume_orchestration(self, instance_id: str) -> None:
self._logger.info(f"Resuming instance '{instance_id}'.")
self._stub.ResumeInstance(req)

def rewind_orchestration(self, instance_id: str, *,
reason: Optional[str] = None):
"""Rewinds a failed orchestration instance to its last known good state.

Rewind removes failed task and sub-orchestration results from the
orchestration history and replays the orchestration from the last
successful checkpoint. Activities that previously succeeded are
not re-executed; only failed work is retried.

Args:
instance_id: The ID of the orchestration instance to rewind.
reason: An optional reason string describing why the orchestration is being rewound.
"""
req = pb.RewindInstanceRequest(
instanceId=instance_id,
reason=helpers.get_string_value(reason))

self._logger.info(f"Rewinding instance '{instance_id}'.")
self._stub.RewindInstance(req)

def restart_orchestration(self, instance_id: str, *,
restart_with_new_instance_id: bool = False) -> str:
"""Restarts an existing orchestration instance.
Expand Down
15 changes: 15 additions & 0 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,21 @@ def new_terminated_event(*, encoded_output: Optional[str] = None) -> pb.HistoryE
)


def new_execution_completed_event(
status: 'pb.OrchestrationStatus',
encoded_result: Optional[str] = None,
failure_details: Optional['pb.TaskFailureDetails'] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
executionCompleted=pb.ExecutionCompletedEvent(
orchestrationStatus=status,
result=get_string_value(encoded_result),
failureDetails=failure_details,
)
)


def get_string_value(val: Optional[str]) -> Optional[wrappers_pb2.StringValue]:
if val is None:
return None
Expand Down
3 changes: 1 addition & 2 deletions durabletask/testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ The in-memory backend is designed for testing and has some limitations compared
1. **No persistence**: All state is lost when the backend is stopped
2. **No distributed execution**: Runs in a single process
3. **No history streaming**: StreamInstanceHistory is not implemented
4. **No rewind**: RewindInstance is not implemented
5. **No recursive termination**: Recursive termination is not supported
4. **No recursive termination**: Recursive termination is not supported

### Best Practices

Expand Down
179 changes: 173 additions & 6 deletions durabletask/testing/in_memory_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,16 @@ def StartInstance(self, request: pb.CreateInstanceRequest, context):
)
self._next_completion_token += 1

# Add initial events to start the orchestration
orchestrator_started = helpers.new_orchestrator_started_event(start_time)
# Add initial events to start the orchestration.
# orchestratorStarted bookends each replay batch and is
# always the very first event, followed by executionStarted.
execution_started = helpers.new_execution_started_event(
request.name, instance_id,
request.input.value if request.input else None,
dict(request.tags) if request.tags else None,
version=version,
)
orchestrator_started = helpers.new_orchestrator_started_event(start_time)

instance.pending_events.append(orchestrator_started)
instance.pending_events.append(execution_started)
Expand Down Expand Up @@ -612,6 +614,22 @@ def CompleteOrchestratorTask(self, request: pb.OrchestratorResponse, context):
instance.completion_token = self._next_completion_token
self._next_completion_token += 1

# Bookend the replay with orchestratorCompleted.
# Skip for continue-as-new (status is PENDING after reset).
if instance.status != pb.ORCHESTRATION_STATUS_PENDING:
instance.history.append(helpers.new_orchestrator_completed_event())

# executionCompleted is the very last event when the
# orchestration reaches a terminal state.
if self._is_terminal_status(instance.status):
instance.history.append(
helpers.new_execution_completed_event(
instance.status,
instance.output,
instance.failure_details,
)
)

# Remove from in-flight before notifying or re-enqueuing
self._orchestration_in_flight.discard(request.instanceId)

Expand Down Expand Up @@ -981,8 +999,33 @@ def DeleteTaskHub(self, request: pb.DeleteTaskHubRequest, context):
return pb.DeleteTaskHubResponse()

def RewindInstance(self, request: pb.RewindInstanceRequest, context):
"""Rewinds an orchestration instance (not implemented)."""
context.abort(grpc.StatusCode.UNIMPLEMENTED, "RewindInstance not implemented")
"""Rewinds a failed orchestration instance.

The backend validates the instance is in a failed state, appends
an ``ExecutionRewoundEvent`` to the pending events, resets the
instance status to RUNNING, and re-enqueues the orchestration
so the worker can replay it and produce a
``RewindOrchestrationAction`` with the corrected history.
"""
with self._lock:
instance = self._instances.get(request.instanceId)
if not instance:
context.abort(
grpc.StatusCode.NOT_FOUND,
f"Orchestration instance '{request.instanceId}' not found")
return pb.RewindInstanceResponse()

if instance.status != pb.ORCHESTRATION_STATUS_FAILED:
context.abort(
grpc.StatusCode.FAILED_PRECONDITION,
f"Orchestration instance '{request.instanceId}' is not in a failed state")
return pb.RewindInstanceResponse()

reason = request.reason.value if request.HasField("reason") else None
self._prepare_rewind(instance, reason)

self._logger.info(f"Rewound instance '{request.instanceId}'")
return pb.RewindInstanceResponse()

def AbandonTaskActivityWorkItem(self, request: pb.AbandonActivityTaskRequest, context):
"""Abandons an activity work item."""
Expand Down Expand Up @@ -1067,9 +1110,9 @@ def _create_instance_internal(self, instance_id: str, name: str,
)
self._next_completion_token += 1

orchestrator_started = helpers.new_orchestrator_started_event(now)
execution_started = helpers.new_execution_started_event(
name, instance_id, encoded_input, version=version)
orchestrator_started = helpers.new_orchestrator_started_event(now)
instance.pending_events.append(orchestrator_started)
instance.pending_events.append(execution_started)

Expand Down Expand Up @@ -1196,6 +1239,8 @@ def _process_action(self, instance: OrchestrationInstance, action: pb.Orchestrat
self._process_send_event_action(action.sendEvent)
elif action.HasField("sendEntityMessage"):
self._process_send_entity_message_action(instance, action)
elif action.HasField("rewindOrchestration"):
self._process_rewind_orchestration_action(instance, action.rewindOrchestration)

def _process_complete_orchestration_action(self, instance: OrchestrationInstance,
complete_action: pb.CompleteOrchestrationAction):
Expand Down Expand Up @@ -1230,11 +1275,11 @@ def _process_complete_orchestration_action(self, instance: OrchestrationInstance
# Build the new pending events in the correct order:
# OrchestratorStarted, ExecutionStarted, carryover, new arrivals
now = datetime.now(timezone.utc)
orchestrator_started = helpers.new_orchestrator_started_event(now)
execution_started = helpers.new_execution_started_event(
instance.name, instance.instance_id, new_input,
version=new_version,
)
orchestrator_started = helpers.new_orchestrator_started_event(now)
instance.pending_events.append(orchestrator_started)
instance.pending_events.append(execution_started)
instance.pending_events.extend(carryover_events)
Expand Down Expand Up @@ -1558,6 +1603,128 @@ def _signal_entity_internal(self, entity_id: str, operation: str,
)
self._queue_entity_operation(entity_id, event)

def _prepare_rewind(self, instance: OrchestrationInstance,
reason: Optional[str] = None):
"""Prepares an orchestration instance for rewind.

Appends an ``ExecutionRewoundEvent`` to the pending events, resets
the instance status to RUNNING, and re-enqueues it so the worker
can replay it. The actual history rewriting is done by the SDK
worker when it processes the rewind event.

Args:
instance: The orchestration instance to rewind.
reason: Optional reason string for the rewind.

Note:
Must be called while holding ``self._lock``.
"""
# Reset instance state so it can be re-processed.
instance.status = pb.ORCHESTRATION_STATUS_RUNNING
instance.output = None
instance.failure_details = None
instance.last_updated_at = datetime.now(timezone.utc)

# Clear any stale dispatched events.
instance.dispatched_events.clear()

# Add the ExecutionRewound event as a new pending event.
rewind_event = pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
executionRewound=pb.ExecutionRewoundEvent(
reason=wrappers_pb2.StringValue(value=reason) if reason else None,
),
)
instance.pending_events.append(rewind_event)

# Refresh the completion token and enqueue.
instance.completion_token = self._next_completion_token
self._next_completion_token += 1
self._orchestration_in_flight.discard(instance.instance_id)
self._enqueue_orchestration(instance.instance_id)

def _process_rewind_orchestration_action(
self, instance: OrchestrationInstance,
rewind_action: pb.RewindOrchestrationAction):
"""Processes a RewindOrchestrationAction returned by the SDK.

The action contains a ``newHistory`` field with the rewritten
history computed by the SDK (failed tasks and sub-orchestration
failures removed). The backend replaces the instance's history
with this new history, recursively rewinds any failed
sub-orchestrations, and re-enqueues the orchestration.
"""
new_history = list(rewind_action.newHistory)

# Replace history with the rewritten version.
instance.history = new_history
instance.status = pb.ORCHESTRATION_STATUS_RUNNING
instance.output = None
instance.failure_details = None
instance.last_updated_at = datetime.now(timezone.utc)

# Identify sub-orchestrations that were created but did not
# complete successfully — they need to be recursively rewound.
completed_sub_orch_task_ids: set[int] = set()
created_sub_orch_events: dict[int, pb.HistoryEvent] = {}
for event in new_history:
if event.HasField("subOrchestrationInstanceCreated"):
created_sub_orch_events[event.eventId] = event
elif event.HasField("subOrchestrationInstanceCompleted"):
completed_sub_orch_task_ids.add(
event.subOrchestrationInstanceCompleted.taskScheduledId)

# Extract the rewind reason from the last ExecutionRewound event.
reason: Optional[str] = None
for event in reversed(new_history):
if event.HasField("executionRewound"):
if event.executionRewound.HasField("reason"):
reason = event.executionRewound.reason.value
break

# Recursively rewind failed sub-orchestrations. If the sub was
# purged (no longer in _instances), re-create it from the
# subOrchestrationInstanceCreated event so it runs fresh.
for task_id, event in created_sub_orch_events.items():
if task_id not in completed_sub_orch_task_ids:
sub_info = event.subOrchestrationInstanceCreated
sub_instance_id = sub_info.instanceId
sub_instance = self._instances.get(sub_instance_id)
if sub_instance is None:
# Sub-orchestration was purged — re-create it.
sub_name = sub_info.name
sub_input = sub_info.input.value if sub_info.HasField("input") else None
sub_version = sub_info.version.value if sub_info.HasField("version") else None
self._create_instance_internal(
sub_instance_id, sub_name, sub_input, version=sub_version)
elif sub_instance.status == pb.ORCHESTRATION_STATUS_FAILED:
self._prepare_rewind(sub_instance, reason)
self._watch_sub_orchestration(
instance.instance_id, sub_instance_id, task_id)

# Re-enqueue so the orchestration replays with the clean history.
# The executionRewound event is added to pending_events so the
# worker can see it in new_events; the worker uses the presence
# of executionRewound in old_events (history) to distinguish
# this normal post-rewind replay from the initial rewind
# short-circuit. Note: we do NOT add orchestratorStarted here
# because the work-item dispatch loop already inserts one when
# the instance has non-empty history.
rewind_event = None
for event in new_history:
if event.HasField("executionRewound"):
rewind_event = event
break
instance.pending_events.clear()
instance.dispatched_events.clear()
if rewind_event is not None:
instance.pending_events.append(rewind_event)
instance.completion_token = self._next_completion_token
self._next_completion_token += 1
self._orchestration_in_flight.discard(instance.instance_id)
self._enqueue_orchestration(instance.instance_id)

def _enqueue_entity(self, entity_id: str):
"""Enqueues an entity for processing."""
if entity_id not in self._entity_queue_set:
Expand Down
Loading
Loading