Scheduler silently stops dispatching cron tasks if a send task hangs
Summary
SchedulerLoop.run in taskiq/cli/scheduler/run.py skips cron ticks for any schedule_id whose previous send task is still in running_schedules. If a send task ever hangs (e.g., broker.kick blocks on a wedged Redis/Sentinel socket), the add_done_callback never fires, the entry stays in running_schedules forever, and every subsequent tick for that schedule is silently dropped — no error, no log, no recovery. The scheduler's main loop continues to iterate normally.
We hit this in production on taskiq==0.11.20 with taskiq-redis==1.0.x (ListQueueSentinelBroker) and verified the same logic is unchanged at HEAD (0.12.4).
Reproduction (conceptual)
@broker.task(schedule=[{"cron": "*/5 * * * *"}])
async def my_periodic_task() -> None:
...
- Scheduler tick fires → spawns
send_task = loop.create_task(send(scheduler, source, task)).
send awaits scheduler.on_ready → broker.kicker().kiq(...) → broker.kick(...).
broker.kick hits a stalled connection (sentinel failover, network blip without TCP RST, kernel half-open socket).
kick never returns. send_task never completes. The done_callback never runs. running_schedules[task.schedule_id] is permanently retained.
- On every subsequent main-loop iteration, the check at
taskiq/cli/scheduler/run.py:337 (HEAD) returns False for this task:
if is_ready_to_send and task.schedule_id not in running_schedules:
so no new send_task is spawned. The cron is silently skipped forever until process restart.
Observed in production
taskiq scheduler ... process while wedged:
- Event loop alive (
epoll_wait, _run_once timer advances every minute per py-spy)
- Zero log output
- All scheduled tasks stopped firing simultaneously after one initial successful burst at startup
- Workers consuming non-scheduled traffic normally (worker process is separate)
- Recovery requires a manual pod restart
Root cause
taskiq/cli/scheduler/run.py (0.12.4 HEAD), inside SchedulerLoop.run:
running_schedules: dict[ScheduleId, asyncio.Task[Any]] = {}
...
for source, task_list in self.scheduled_tasks:
for task in task_list:
is_ready_to_send: bool = self._is_schedule_ready_to_send(task=task, now=now)
if is_ready_to_send and task.schedule_id not in running_schedules:
send_task = self._event_loop.create_task(
send(self.scheduler, source, task),
name=f"schedule_{task.schedule_id}",
)
running_schedules[task.schedule_id] = send_task
send_task.add_done_callback(
lambda task_future: running_schedules.pop(
task_future.get_name().removeprefix("schedule_"),
),
)
Same pattern as 0.11.20's delayed_send / running_schedules. There is no timeout on send_task, no eviction policy, no observability when one runs past a threshold. The only path out of running_schedules is the done_callback, which requires send_task to complete.
Why user-level wrappers don't fix this from the outside
We tried subclassing TaskiqScheduler and overriding on_ready with asyncio.wait_for(super().on_ready(...), timeout=...). py-spy showed the asyncio loop was alive and iterating, but our wrapper never engaged for the hung tick — because the send_task itself is what the scheduler tracks, not the inner on_ready call. If wait_for inside on_ready is supposed to cancel a wedged broker kick, the cancellation requires the inner code to yield, which it doesn't reliably do for a stalled socket connection.
The only place a timeout works reliably is on the send_task itself — and that's owned by the scheduler.
Proposed fix
Add an optional send_timeout: float | None to SchedulerLoop.run (and a --send-timeout CLI argument on SchedulerArgs). When set, the spawned send_task is wrapped in asyncio.wait_for(send(...), timeout=send_timeout). On timeout, log a warning and return cleanly so the done_callback clears running_schedules and the next tick can re-fire.
Default: None (no timeout — preserves existing behavior, opt-in only).
PR follows. Happy to iterate on the API shape if maintainers prefer a different name / default / mechanism.
Environment
- taskiq: 0.11.20 (in production); confirmed unchanged in HEAD 0.12.4
- taskiq-redis: 1.0.x (
ListQueueSentinelBroker)
- Python: 3.12
- Deployed in Kubernetes; trigger correlates with redis-sentinel pod restarts. One sentinel node going down for ~10s, the broker's existing socket goes stale silently, the next kick hangs, scheduler enters the silent-skip state.
Scheduler silently stops dispatching cron tasks if a
sendtask hangsSummary
SchedulerLoop.runintaskiq/cli/scheduler/run.pyskips cron ticks for anyschedule_idwhose previoussendtask is still inrunning_schedules. If asendtask ever hangs (e.g.,broker.kickblocks on a wedged Redis/Sentinel socket), theadd_done_callbacknever fires, the entry stays inrunning_schedulesforever, and every subsequent tick for that schedule is silently dropped — no error, no log, no recovery. The scheduler's main loop continues to iterate normally.We hit this in production on
taskiq==0.11.20withtaskiq-redis==1.0.x(ListQueueSentinelBroker) and verified the same logic is unchanged at HEAD (0.12.4).Reproduction (conceptual)
send_task = loop.create_task(send(scheduler, source, task)).sendawaitsscheduler.on_ready→broker.kicker().kiq(...)→broker.kick(...).broker.kickhits a stalled connection (sentinel failover, network blip without TCP RST, kernel half-open socket).kicknever returns.send_tasknever completes. The done_callback never runs.running_schedules[task.schedule_id]is permanently retained.taskiq/cli/scheduler/run.py:337(HEAD) returnsFalsefor this task:send_taskis spawned. The cron is silently skipped forever until process restart.Observed in production
taskiq scheduler ...process while wedged:epoll_wait,_run_oncetimer advances every minute per py-spy)Root cause
taskiq/cli/scheduler/run.py(0.12.4 HEAD), insideSchedulerLoop.run:Same pattern as
0.11.20'sdelayed_send/running_schedules. There is no timeout onsend_task, no eviction policy, no observability when one runs past a threshold. The only path out ofrunning_schedulesis the done_callback, which requiressend_taskto complete.Why user-level wrappers don't fix this from the outside
We tried subclassing
TaskiqSchedulerand overridingon_readywithasyncio.wait_for(super().on_ready(...), timeout=...). py-spy showed the asyncio loop was alive and iterating, but our wrapper never engaged for the hung tick — because thesend_taskitself is what the scheduler tracks, not the inneron_readycall. Ifwait_forinsideon_readyis supposed to cancel a wedged broker kick, the cancellation requires the inner code to yield, which it doesn't reliably do for a stalled socket connection.The only place a timeout works reliably is on the
send_taskitself — and that's owned by the scheduler.Proposed fix
Add an optional
send_timeout: float | NonetoSchedulerLoop.run(and a--send-timeoutCLI argument onSchedulerArgs). When set, the spawnedsend_taskis wrapped inasyncio.wait_for(send(...), timeout=send_timeout). On timeout, log a warning and return cleanly so the done_callback clearsrunning_schedulesand the next tick can re-fire.Default:
None(no timeout — preserves existing behavior, opt-in only).PR follows. Happy to iterate on the API shape if maintainers prefer a different name / default / mechanism.
Environment
ListQueueSentinelBroker)