Skip to content

Scheduler silently stops dispatching cron tasks if a send task hangs #626

@Rohit-Ekbote

Description

@Rohit-Ekbote

Scheduler silently stops dispatching cron tasks if a send task hangs

Summary

SchedulerLoop.run in taskiq/cli/scheduler/run.py skips cron ticks for any schedule_id whose previous send task is still in running_schedules. If a send task ever hangs (e.g., broker.kick blocks on a wedged Redis/Sentinel socket), the add_done_callback never fires, the entry stays in running_schedules forever, and every subsequent tick for that schedule is silently dropped — no error, no log, no recovery. The scheduler's main loop continues to iterate normally.

We hit this in production on taskiq==0.11.20 with taskiq-redis==1.0.x (ListQueueSentinelBroker) and verified the same logic is unchanged at HEAD (0.12.4).

Reproduction (conceptual)

@broker.task(schedule=[{"cron": "*/5 * * * *"}])
async def my_periodic_task() -> None:
    ...
  1. Scheduler tick fires → spawns send_task = loop.create_task(send(scheduler, source, task)).
  2. send awaits scheduler.on_readybroker.kicker().kiq(...)broker.kick(...).
  3. broker.kick hits a stalled connection (sentinel failover, network blip without TCP RST, kernel half-open socket).
  4. kick never returns. send_task never completes. The done_callback never runs. running_schedules[task.schedule_id] is permanently retained.
  5. On every subsequent main-loop iteration, the check at taskiq/cli/scheduler/run.py:337 (HEAD) returns False for this task:
    if is_ready_to_send and task.schedule_id not in running_schedules:
    so no new send_task is spawned. The cron is silently skipped forever until process restart.

Observed in production

taskiq scheduler ... process while wedged:

  • Event loop alive (epoll_wait, _run_once timer advances every minute per py-spy)
  • Zero log output
  • All scheduled tasks stopped firing simultaneously after one initial successful burst at startup
  • Workers consuming non-scheduled traffic normally (worker process is separate)
  • Recovery requires a manual pod restart

Root cause

taskiq/cli/scheduler/run.py (0.12.4 HEAD), inside SchedulerLoop.run:

running_schedules: dict[ScheduleId, asyncio.Task[Any]] = {}
...
for source, task_list in self.scheduled_tasks:
    for task in task_list:
        is_ready_to_send: bool = self._is_schedule_ready_to_send(task=task, now=now)
        if is_ready_to_send and task.schedule_id not in running_schedules:
            send_task = self._event_loop.create_task(
                send(self.scheduler, source, task),
                name=f"schedule_{task.schedule_id}",
            )
            running_schedules[task.schedule_id] = send_task
            send_task.add_done_callback(
                lambda task_future: running_schedules.pop(
                    task_future.get_name().removeprefix("schedule_"),
                ),
            )

Same pattern as 0.11.20's delayed_send / running_schedules. There is no timeout on send_task, no eviction policy, no observability when one runs past a threshold. The only path out of running_schedules is the done_callback, which requires send_task to complete.

Why user-level wrappers don't fix this from the outside

We tried subclassing TaskiqScheduler and overriding on_ready with asyncio.wait_for(super().on_ready(...), timeout=...). py-spy showed the asyncio loop was alive and iterating, but our wrapper never engaged for the hung tick — because the send_task itself is what the scheduler tracks, not the inner on_ready call. If wait_for inside on_ready is supposed to cancel a wedged broker kick, the cancellation requires the inner code to yield, which it doesn't reliably do for a stalled socket connection.

The only place a timeout works reliably is on the send_task itself — and that's owned by the scheduler.

Proposed fix

Add an optional send_timeout: float | None to SchedulerLoop.run (and a --send-timeout CLI argument on SchedulerArgs). When set, the spawned send_task is wrapped in asyncio.wait_for(send(...), timeout=send_timeout). On timeout, log a warning and return cleanly so the done_callback clears running_schedules and the next tick can re-fire.

Default: None (no timeout — preserves existing behavior, opt-in only).

PR follows. Happy to iterate on the API shape if maintainers prefer a different name / default / mechanism.

Environment

  • taskiq: 0.11.20 (in production); confirmed unchanged in HEAD 0.12.4
  • taskiq-redis: 1.0.x (ListQueueSentinelBroker)
  • Python: 3.12
  • Deployed in Kubernetes; trigger correlates with redis-sentinel pod restarts. One sentinel node going down for ~10s, the broker's existing socket goes stale silently, the next kick hangs, scheduler enters the silent-skip state.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions