@@ -277,6 +277,10 @@ def predicate(inst: OrchestrationInstance) -> bool:
277277 instance = self ._wait_for_state (request .instanceId , predicate , timeout = context .time_remaining ())
278278
279279 if not instance :
280+ with self ._lock :
281+ if request .instanceId in self ._instances :
282+ context .abort (grpc .StatusCode .DEADLINE_EXCEEDED ,
283+ f"Timed out waiting for instance '{ request .instanceId } ' to start" )
280284 return pb .GetInstanceResponse (exists = False )
281285
282286 return self ._build_instance_response (instance , request .getInputsAndOutputs )
@@ -290,6 +294,10 @@ def WaitForInstanceCompletion(self, request: pb.GetInstanceRequest, context):
290294 )
291295
292296 if not instance :
297+ with self ._lock :
298+ if request .instanceId in self ._instances :
299+ context .abort (grpc .StatusCode .DEADLINE_EXCEEDED ,
300+ f"Timed out waiting for instance '{ request .instanceId } ' to complete" )
293301 return pb .GetInstanceResponse (exists = False )
294302
295303 return self ._build_instance_response (instance , request .getInputsAndOutputs )
@@ -309,7 +317,14 @@ def RaiseEvent(self, request: pb.RaiseEventRequest, context):
309317 )
310318 instance .pending_events .append (event )
311319 instance .last_updated_at = datetime .now (timezone .utc )
312- self ._enqueue_orchestration (instance .instance_id )
320+
321+ # Don't dispatch work for suspended or terminal orchestrations;
322+ # suspended events will be delivered when the orchestration is
323+ # resumed, and terminal orchestrations can't process new events.
324+ not_terminal = not self ._is_terminal_status (instance .status )
325+ not_suspended = instance .status != pb .ORCHESTRATION_STATUS_SUSPENDED
326+ if not_terminal and not_suspended :
327+ self ._enqueue_orchestration (instance .instance_id )
313328
314329 self ._logger .info (f"Raised event '{ request .name } ' for instance '{ request .instanceId } '" )
315330 return pb .RaiseEventResponse ()
0 commit comments