From f0ac543c6c4b4876daff50ffd75243dff7f17771 Mon Sep 17 00:00:00 2001 From: Sergei Frangulov Date: Mon, 18 May 2026 22:39:44 +0400 Subject: [PATCH] fix: round-trip timedelta cron offset (#605) pydantic serializes a timedelta offset to an ISO-8601 duration string (e.g. "PT4H"). Because the offset fields are typed as str | timedelta, re-validating kept the value as a str, so the scheduler later passed it to ZoneInfo("PT4H") and the schedule broke. Add a shared parse_cron_offset validator that restores a timedelta from its serialized duration form while leaving genuine timezone names (e.g. "US/Eastern") and other values untouched. Apply it as a before-validator on CronSpec.offset and ScheduledTask.cron_offset (pydantic v1 and v2). Closes #605 --- taskiq/scheduler/scheduled_task/cron_spec.py | 20 +++++++ taskiq/scheduler/scheduled_task/v1.py | 12 ++++- taskiq/scheduler/scheduled_task/v2.py | 12 ++++- taskiq/scheduler/scheduled_task/validators.py | 27 ++++++++++ tests/scheduler/test_cron_offset_roundtrip.py | 53 +++++++++++++++++++ 5 files changed, 120 insertions(+), 4 deletions(-) create mode 100644 tests/scheduler/test_cron_offset_roundtrip.py diff --git a/taskiq/scheduler/scheduled_task/cron_spec.py b/taskiq/scheduler/scheduled_task/cron_spec.py index 781eba25..72a1274d 100644 --- a/taskiq/scheduler/scheduled_task/cron_spec.py +++ b/taskiq/scheduler/scheduled_task/cron_spec.py @@ -1,7 +1,22 @@ +from collections.abc import Callable from datetime import timedelta +from typing import Any from pydantic import BaseModel +from taskiq.compat import IS_PYDANTIC2 +from taskiq.scheduler.scheduled_task.validators import parse_cron_offset + +_offset_before_validator: Callable[..., Any] +if IS_PYDANTIC2: + from pydantic import field_validator + + _offset_before_validator = field_validator("offset", mode="before") +else: + from pydantic import validator + + _offset_before_validator = validator("offset", pre=True) + class CronSpec(BaseModel): """Cron specification for running tasks.""" @@ -14,6 +29,11 @@ class CronSpec(BaseModel): offset: str | timedelta | None = None + @_offset_before_validator + @classmethod + def _parse_offset(cls, value: Any) -> Any: + return parse_cron_offset(value) + def to_cron(self) -> str: # pragma: no cover """Converts cron spec to cron string.""" return f"{self.minutes} {self.hours} {self.days} {self.months} {self.weekdays}" diff --git a/taskiq/scheduler/scheduled_task/v1.py b/taskiq/scheduler/scheduled_task/v1.py index cc4e3c5f..bd8d15de 100644 --- a/taskiq/scheduler/scheduled_task/v1.py +++ b/taskiq/scheduler/scheduled_task/v1.py @@ -2,9 +2,12 @@ from datetime import datetime, timedelta from typing import Any -from pydantic import BaseModel, Field, root_validator +from pydantic import BaseModel, Field, root_validator, validator -from taskiq.scheduler.scheduled_task.validators import validate_interval_value +from taskiq.scheduler.scheduled_task.validators import ( + parse_cron_offset, + validate_interval_value, +) class ScheduledTask(BaseModel): @@ -21,6 +24,11 @@ class ScheduledTask(BaseModel): time: datetime | None = None interval: int | timedelta | None = None + @validator("cron_offset", pre=True) + @classmethod + def _parse_cron_offset(cls, value: Any) -> Any: + return parse_cron_offset(value) + @root_validator(pre=False) # type: ignore @classmethod def __check(cls, values: dict[str, Any]) -> dict[str, Any]: diff --git a/taskiq/scheduler/scheduled_task/v2.py b/taskiq/scheduler/scheduled_task/v2.py index 722d225f..b284fff2 100644 --- a/taskiq/scheduler/scheduled_task/v2.py +++ b/taskiq/scheduler/scheduled_task/v2.py @@ -3,9 +3,12 @@ from datetime import datetime, timedelta from typing import Any -from pydantic import BaseModel, Field, model_validator +from pydantic import BaseModel, Field, field_validator, model_validator -from taskiq.scheduler.scheduled_task.validators import validate_interval_value +from taskiq.scheduler.scheduled_task.validators import ( + parse_cron_offset, + validate_interval_value, +) if sys.version_info >= (3, 11): from typing import Self @@ -27,6 +30,11 @@ class ScheduledTask(BaseModel): time: datetime | None = None interval: int | timedelta | None = None + @field_validator("cron_offset", mode="before") + @classmethod + def _parse_cron_offset(cls, value: Any) -> Any: + return parse_cron_offset(value) + @model_validator(mode="after") def __check(self) -> Self: """Validate. diff --git a/taskiq/scheduler/scheduled_task/validators.py b/taskiq/scheduler/scheduled_task/validators.py index 05e51849..b40abaa7 100644 --- a/taskiq/scheduler/scheduled_task/validators.py +++ b/taskiq/scheduler/scheduled_task/validators.py @@ -1,4 +1,31 @@ from datetime import timedelta +from typing import Any + +from pydantic import ValidationError + +from taskiq.compat import parse_obj_as + + +def parse_cron_offset(value: Any) -> Any: + """Restore a ``timedelta`` cron offset from its serialized form. + + pydantic serializes a ``timedelta`` offset to an ISO-8601 duration + string (e.g. ``"PT4H"``). Since the offset field is typed as + ``str | timedelta``, on reload the value stays a ``str``, which then + breaks timezone handling in the scheduler (``ZoneInfo("PT4H")``). + + Parse such duration strings back to ``timedelta`` while leaving genuine + timezone names (e.g. ``"US/Eastern"``) and other values untouched. + + :param value: raw offset value coming from validation. + :return: a ``timedelta`` for serialized durations, otherwise ``value``. + """ + if not isinstance(value, str): + return value + try: + return parse_obj_as(timedelta, value) + except ValidationError: + return value def validate_interval_value( diff --git a/tests/scheduler/test_cron_offset_roundtrip.py b/tests/scheduler/test_cron_offset_roundtrip.py new file mode 100644 index 00000000..37e2d9a7 --- /dev/null +++ b/tests/scheduler/test_cron_offset_roundtrip.py @@ -0,0 +1,53 @@ +"""Regression tests for #605. + +A ``timedelta`` offset is serialized by pydantic to an ISO-8601 duration +string (e.g. ``"PT4H"``). On reload the ``str | timedelta`` union kept it as +``str``, which then broke timezone handling in the scheduler +(``ZoneInfo("PT4H")``). The offset must round-trip back to ``timedelta`` +while genuine timezone names stay untouched. +""" + +from datetime import timedelta + +from taskiq.compat import model_dump, model_validate +from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask + + +def test_cron_spec_timedelta_offset_roundtrip() -> None: + offset = timedelta(hours=4) + restored = model_validate(CronSpec, model_dump(CronSpec(offset=offset))) + assert restored.offset == offset + assert isinstance(restored.offset, timedelta) + + +def test_cron_spec_timezone_name_offset_preserved() -> None: + restored = model_validate(CronSpec, model_dump(CronSpec(offset="US/Eastern"))) + assert restored.offset == "US/Eastern" + + +def test_scheduled_task_timedelta_cron_offset_roundtrip() -> None: + offset = timedelta(hours=2) + task = ScheduledTask( + task_name="a", + labels={}, + args=[], + kwargs={}, + cron="* * * * *", + cron_offset=offset, + ) + restored = model_validate(ScheduledTask, model_dump(task)) + assert restored.cron_offset == offset + assert isinstance(restored.cron_offset, timedelta) + + +def test_scheduled_task_timezone_name_cron_offset_preserved() -> None: + task = ScheduledTask( + task_name="a", + labels={}, + args=[], + kwargs={}, + cron="* * * * *", + cron_offset="US/Eastern", + ) + restored = model_validate(ScheduledTask, model_dump(task)) + assert restored.cron_offset == "US/Eastern"