Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions taskiq/scheduler/scheduled_task/cron_spec.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
from collections.abc import Callable
from datetime import timedelta
from typing import Any

from pydantic import BaseModel

from taskiq.compat import IS_PYDANTIC2
from taskiq.scheduler.scheduled_task.validators import parse_cron_offset

_offset_before_validator: Callable[..., Any]
if IS_PYDANTIC2:
from pydantic import field_validator

_offset_before_validator = field_validator("offset", mode="before")
else:
from pydantic import validator

_offset_before_validator = validator("offset", pre=True)


class CronSpec(BaseModel):
"""Cron specification for running tasks."""
Expand All @@ -14,6 +29,11 @@ class CronSpec(BaseModel):

offset: str | timedelta | None = None

@_offset_before_validator
@classmethod
def _parse_offset(cls, value: Any) -> Any:
return parse_cron_offset(value)

def to_cron(self) -> str: # pragma: no cover
"""Converts cron spec to cron string."""
return f"{self.minutes} {self.hours} {self.days} {self.months} {self.weekdays}"
12 changes: 10 additions & 2 deletions taskiq/scheduler/scheduled_task/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
from datetime import datetime, timedelta
from typing import Any

from pydantic import BaseModel, Field, root_validator
from pydantic import BaseModel, Field, root_validator, validator

from taskiq.scheduler.scheduled_task.validators import validate_interval_value
from taskiq.scheduler.scheduled_task.validators import (
parse_cron_offset,
validate_interval_value,
)


class ScheduledTask(BaseModel):
Expand All @@ -21,6 +24,11 @@ class ScheduledTask(BaseModel):
time: datetime | None = None
interval: int | timedelta | None = None

@validator("cron_offset", pre=True)
@classmethod
def _parse_cron_offset(cls, value: Any) -> Any:
return parse_cron_offset(value)

@root_validator(pre=False) # type: ignore
@classmethod
def __check(cls, values: dict[str, Any]) -> dict[str, Any]:
Expand Down
12 changes: 10 additions & 2 deletions taskiq/scheduler/scheduled_task/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from datetime import datetime, timedelta
from typing import Any

from pydantic import BaseModel, Field, model_validator
from pydantic import BaseModel, Field, field_validator, model_validator

from taskiq.scheduler.scheduled_task.validators import validate_interval_value
from taskiq.scheduler.scheduled_task.validators import (
parse_cron_offset,
validate_interval_value,
)

if sys.version_info >= (3, 11):
from typing import Self
Expand All @@ -27,6 +30,11 @@ class ScheduledTask(BaseModel):
time: datetime | None = None
interval: int | timedelta | None = None

@field_validator("cron_offset", mode="before")
@classmethod
def _parse_cron_offset(cls, value: Any) -> Any:
return parse_cron_offset(value)

@model_validator(mode="after")
def __check(self) -> Self:
"""Validate.
Expand Down
27 changes: 27 additions & 0 deletions taskiq/scheduler/scheduled_task/validators.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,31 @@
from datetime import timedelta
from typing import Any

from pydantic import ValidationError

from taskiq.compat import parse_obj_as


def parse_cron_offset(value: Any) -> Any:
"""Restore a ``timedelta`` cron offset from its serialized form.

pydantic serializes a ``timedelta`` offset to an ISO-8601 duration
string (e.g. ``"PT4H"``). Since the offset field is typed as
``str | timedelta``, on reload the value stays a ``str``, which then
breaks timezone handling in the scheduler (``ZoneInfo("PT4H")``).

Parse such duration strings back to ``timedelta`` while leaving genuine
timezone names (e.g. ``"US/Eastern"``) and other values untouched.

:param value: raw offset value coming from validation.
:return: a ``timedelta`` for serialized durations, otherwise ``value``.
"""
if not isinstance(value, str):
return value
try:
return parse_obj_as(timedelta, value)
except ValidationError:
return value


def validate_interval_value(
Expand Down
53 changes: 53 additions & 0 deletions tests/scheduler/test_cron_offset_roundtrip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Regression tests for #605.

A ``timedelta`` offset is serialized by pydantic to an ISO-8601 duration
string (e.g. ``"PT4H"``). On reload the ``str | timedelta`` union kept it as
``str``, which then broke timezone handling in the scheduler
(``ZoneInfo("PT4H")``). The offset must round-trip back to ``timedelta``
while genuine timezone names stay untouched.
"""

from datetime import timedelta

from taskiq.compat import model_dump, model_validate
from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask


def test_cron_spec_timedelta_offset_roundtrip() -> None:
offset = timedelta(hours=4)
restored = model_validate(CronSpec, model_dump(CronSpec(offset=offset)))
assert restored.offset == offset
assert isinstance(restored.offset, timedelta)


def test_cron_spec_timezone_name_offset_preserved() -> None:
restored = model_validate(CronSpec, model_dump(CronSpec(offset="US/Eastern")))
assert restored.offset == "US/Eastern"


def test_scheduled_task_timedelta_cron_offset_roundtrip() -> None:
offset = timedelta(hours=2)
task = ScheduledTask(
task_name="a",
labels={},
args=[],
kwargs={},
cron="* * * * *",
cron_offset=offset,
)
restored = model_validate(ScheduledTask, model_dump(task))
assert restored.cron_offset == offset
assert isinstance(restored.cron_offset, timedelta)


def test_scheduled_task_timezone_name_cron_offset_preserved() -> None:
task = ScheduledTask(
task_name="a",
labels={},
args=[],
kwargs={},
cron="* * * * *",
cron_offset="US/Eastern",
)
restored = model_validate(ScheduledTask, model_dump(task))
assert restored.cron_offset == "US/Eastern"
Loading