From 422b50a7515606646a710218feb1fcf182e1873e Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Sat, 28 Feb 2026 14:58:24 +0000 Subject: [PATCH 01/13] add pgcron + scheduling primitives to Durable --- src/client.rs | 4 +- src/cron.rs | 765 ++++++++++++++++++ src/error.rs | 25 + src/lib.rs | 2 + .../20260228000000_add_cron_schedules.sql | 23 + tests/cron_test.rs | 489 +++++++++++ 6 files changed, 1306 insertions(+), 2 deletions(-) create mode 100644 src/cron.rs create mode 100644 src/postgres/migrations/20260228000000_add_cron_schedules.sql create mode 100644 tests/cron_test.rs diff --git a/src/client.rs b/src/client.rs index ad8d8a2..c789a7b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -55,7 +55,7 @@ impl CancellationPolicyDb { use crate::worker::Worker; /// Validates that user-provided headers don't use reserved prefixes. -fn validate_headers(headers: &Option>) -> DurableResult<()> { +pub(crate) fn validate_headers(headers: &Option>) -> DurableResult<()> { if let Some(headers) = headers { for key in headers.keys() { if key.starts_with("durable::") { @@ -578,7 +578,7 @@ where }) } - fn serialize_spawn_options( + pub(crate) fn serialize_spawn_options( options: &SpawnOptions, max_attempts: u32, ) -> serde_json::Result { diff --git a/src/cron.rs b/src/cron.rs new file mode 100644 index 0000000..efaf18d --- /dev/null +++ b/src/cron.rs @@ -0,0 +1,765 @@ +use chrono::{DateTime, Utc}; +use serde_json::Value as JsonValue; +use sqlx::{PgPool, Postgres, QueryBuilder}; +use std::collections::HashMap; + +use crate::client::{Durable, validate_headers}; +use crate::error::{DurableError, DurableResult}; +use crate::types::SpawnOptions; + +/// Options for creating a cron schedule. +#[derive(Debug, Clone)] +pub struct ScheduleOptions { + /// The task name to spawn on each cron tick. + pub task_name: String, + /// Standard 5-field cron expression (minute hour day-of-month month day-of-week). + pub cron_expression: String, + /// Parameters to pass to the spawned task (serialized as JSON). + pub params: JsonValue, + /// Spawn options (max_attempts, retry_strategy, cancellation, headers). + pub spawn_options: SpawnOptions, + /// Arbitrary user-defined metadata for categorization/filtering. + pub metadata: Option>, +} + +/// Information about an existing cron schedule. +#[derive(Debug, Clone)] +pub struct ScheduleInfo { + /// The schedule name (unique within the queue). + pub name: String, + /// The cron expression. + pub cron_expression: String, + /// The task name that gets spawned. + pub task_name: String, + /// The parameters passed to the spawned task. + pub params: JsonValue, + /// The serialized spawn options. + pub spawn_options: JsonValue, + /// User-defined metadata. + pub metadata: HashMap, + /// The pg_cron job name. + pub pgcron_job_name: String, + /// When the schedule was created. + pub created_at: DateTime, + /// When the schedule was last updated. + pub updated_at: DateTime, +} + +/// Filter for listing schedules. +#[derive(Debug, Clone, Default)] +pub struct ScheduleFilter { + /// Filter by task name (exact match). + pub task_name: Option, + /// Filter by metadata (JSONB `@>` containment). + /// e.g. `{"team": "payments"}` matches schedules whose metadata contains that key-value. + pub metadata: Option>, +} + +/// Set up the pg_cron extension in the database. +/// +/// Attempts to create the extension and verifies it is available. +/// Call this once during application startup before using cron scheduling. +/// +/// # Errors +/// +/// Returns [`DurableError::PgCronUnavailable`] if the extension cannot be created +/// or is not available. +pub async fn setup_pgcron(pool: &PgPool) -> DurableResult<()> { + // Attempt to create the extension, ignoring errors (user may not have privileges) + let _ = sqlx::query( + "DO $$ BEGIN CREATE EXTENSION IF NOT EXISTS pg_cron; EXCEPTION WHEN OTHERS THEN NULL; END $$" + ) + .execute(pool) + .await; + + // Verify it exists + let exists: (bool,) = + sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_cron')") + .fetch_one(pool) + .await?; + + if !exists.0 { + return Err(DurableError::PgCronUnavailable { + reason: "pg_cron extension is not installed and could not be created".to_string(), + }); + } + + Ok(()) +} + +impl Durable +where + State: Clone + Send + Sync + 'static, +{ + /// Create or update a cron schedule. + /// + /// If a schedule with the same name already exists in this queue, it will be updated + /// (upsert semantics). The pg_cron job is created/updated and the schedule metadata + /// is stored in `durable.cron_schedules`. + /// + /// # Arguments + /// + /// * `schedule_name` - Unique name for this schedule within the queue + /// (alphanumeric, hyphens, and underscores only). + /// * `options` - Schedule configuration including task name, cron expression, + /// params, spawn options, and metadata. + /// + /// # Errors + /// + /// Returns an error if: + /// - The schedule name is invalid + /// - The cron expression is invalid + /// - Headers use the reserved `durable::` prefix + /// - pg_cron is not available + pub async fn create_schedule( + &self, + schedule_name: &str, + options: ScheduleOptions, + ) -> DurableResult<()> { + // Validate inputs + validate_schedule_name(schedule_name)?; + validate_cron_expression(&options.cron_expression)?; + validate_headers(&options.spawn_options.headers)?; + + let pgcron_job_name = format!("durable_{}_{}", self.queue_name(), schedule_name); + + // Build spawn options with injected durable:: headers + let mut spawn_options = options.spawn_options.clone(); + let headers = spawn_options.headers.get_or_insert_with(HashMap::new); + headers.insert( + "durable::scheduled_by".to_string(), + JsonValue::String(schedule_name.to_string()), + ); + headers.insert( + "durable::cron".to_string(), + JsonValue::String(options.cron_expression.clone()), + ); + + let max_attempts = spawn_options.max_attempts.unwrap_or(5); + let db_options = Self::serialize_spawn_options(&spawn_options, max_attempts) + .map_err(DurableError::Serialization)?; + + // Build the SQL command that pg_cron will execute + let spawn_sql = build_pgcron_spawn_sql( + self.queue_name(), + &options.task_name, + &options.params, + &db_options, + ); + + let metadata_value = match &options.metadata { + Some(m) => serde_json::to_value(m).map_err(DurableError::Serialization)?, + None => serde_json::json!({}), + }; + + // Check pg_cron availability + let exists: (bool,) = + sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_cron')") + .fetch_one(self.pool()) + .await?; + + if !exists.0 { + return Err(DurableError::PgCronUnavailable { + reason: "pg_cron extension is not installed".to_string(), + }); + } + + // Execute in a transaction + let mut tx = self.pool().begin().await?; + + // Schedule the pg_cron job (has built-in upsert semantics) + sqlx::query("SELECT cron.schedule($1, $2, $3)") + .bind(&pgcron_job_name) + .bind(&options.cron_expression) + .bind(&spawn_sql) + .execute(&mut *tx) + .await + .map_err(|e| map_pgcron_error(e, "create"))?; + + // Upsert into our schedule registry + sqlx::query( + "INSERT INTO durable.cron_schedules + (schedule_name, queue_name, task_name, cron_expression, params, spawn_options, metadata, pgcron_job_name, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now(), now()) + ON CONFLICT (queue_name, schedule_name) + DO UPDATE SET + task_name = EXCLUDED.task_name, + cron_expression = EXCLUDED.cron_expression, + params = EXCLUDED.params, + spawn_options = EXCLUDED.spawn_options, + metadata = EXCLUDED.metadata, + pgcron_job_name = EXCLUDED.pgcron_job_name, + updated_at = now()" + ) + .bind(schedule_name) + .bind(self.queue_name()) + .bind(&options.task_name) + .bind(&options.cron_expression) + .bind(&options.params) + .bind(&db_options) + .bind(&metadata_value) + .bind(&pgcron_job_name) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + /// Delete a cron schedule. + /// + /// Removes the pg_cron job and the schedule registry entry. Any in-flight + /// tasks that were already spawned by this schedule are not cancelled. + /// + /// # Errors + /// + /// Returns [`DurableError::ScheduleNotFound`] if the schedule does not exist. + pub async fn delete_schedule(&self, schedule_name: &str) -> DurableResult<()> { + // Look up the pgcron_job_name + let row: Option<(String,)> = sqlx::query_as( + "SELECT pgcron_job_name FROM durable.cron_schedules + WHERE queue_name = $1 AND schedule_name = $2", + ) + .bind(self.queue_name()) + .bind(schedule_name) + .fetch_optional(self.pool()) + .await?; + + let (pgcron_job_name,) = row.ok_or_else(|| DurableError::ScheduleNotFound { + schedule_name: schedule_name.to_string(), + queue_name: self.queue_name().to_string(), + })?; + + let mut tx = self.pool().begin().await?; + + // Look up the jobid from cron.job and unschedule it + let job_row: Option<(i64,)> = + sqlx::query_as("SELECT jobid FROM cron.job WHERE jobname = $1") + .bind(&pgcron_job_name) + .fetch_optional(&mut *tx) + .await + .map_err(|e| map_pgcron_error(e, "delete"))?; + + if let Some((jobid,)) = job_row { + sqlx::query("SELECT cron.unschedule($1)") + .bind(jobid) + .execute(&mut *tx) + .await + .map_err(|e| map_pgcron_error(e, "delete"))?; + } + + // Delete from our registry + sqlx::query( + "DELETE FROM durable.cron_schedules + WHERE queue_name = $1 AND schedule_name = $2", + ) + .bind(self.queue_name()) + .bind(schedule_name) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + /// List cron schedules, optionally filtered. + /// + /// Returns all schedules for this queue. If a filter is provided, results + /// are narrowed by task name and/or metadata containment. + /// + /// This only queries the `durable.cron_schedules` table (no pg_cron queries), + /// so it works even if pg_cron is not installed. + pub async fn list_schedules( + &self, + filter: Option, + ) -> DurableResult> { + let filter = filter.unwrap_or_default(); + + let mut qb: QueryBuilder = QueryBuilder::new( + "SELECT schedule_name, cron_expression, task_name, params, spawn_options, metadata, pgcron_job_name, created_at, updated_at + FROM durable.cron_schedules + WHERE queue_name = ", + ); + qb.push_bind(self.queue_name()); + + if let Some(ref task_name) = filter.task_name { + qb.push(" AND task_name = ").push_bind(task_name.clone()); + } + + if let Some(ref metadata) = filter.metadata { + let metadata_json = + serde_json::to_value(metadata).map_err(DurableError::Serialization)?; + qb.push(" AND metadata @> ") + .push_bind(metadata_json) + .push("::jsonb"); + } + + qb.push(" ORDER BY schedule_name"); + + let rows: Vec = qb + .build_query_as::() + .fetch_all(self.pool()) + .await?; + + rows.into_iter() + .map(|row| { + let metadata: HashMap = + serde_json::from_value(row.metadata).unwrap_or_default(); + Ok(ScheduleInfo { + name: row.schedule_name, + cron_expression: row.cron_expression, + task_name: row.task_name, + params: row.params, + spawn_options: row.spawn_options, + metadata, + pgcron_job_name: row.pgcron_job_name, + created_at: row.created_at, + updated_at: row.updated_at, + }) + }) + .collect() + } +} + +// --- Internal types --- + +#[derive(Debug, sqlx::FromRow)] +struct ScheduleRow { + schedule_name: String, + cron_expression: String, + task_name: String, + params: JsonValue, + spawn_options: JsonValue, + metadata: JsonValue, + pgcron_job_name: String, + created_at: DateTime, + updated_at: DateTime, +} + +// --- Validation helpers --- + +/// Validate a 5-field standard cron expression. +fn validate_cron_expression(expr: &str) -> DurableResult<()> { + let fields: Vec<&str> = expr.split_whitespace().collect(); + if fields.len() != 5 { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("expected 5 fields, got {}", fields.len()), + }); + } + + let field_names = ["minute", "hour", "day-of-month", "month", "day-of-week"]; + let field_ranges: [(u32, u32); 5] = [(0, 59), (0, 23), (1, 31), (1, 12), (0, 7)]; + + for (i, field) in fields.iter().enumerate() { + validate_cron_field( + expr, + field, + field_names[i], + field_ranges[i].0, + field_ranges[i].1, + )?; + } + + Ok(()) +} + +fn validate_cron_field( + expr: &str, + field: &str, + name: &str, + min: u32, + max: u32, +) -> DurableResult<()> { + if field.is_empty() { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("{name} field is empty"), + }); + } + + // Split by comma for lists (e.g., "1,15,30") + for part in field.split(',') { + validate_cron_part(expr, part, name, min, max)?; + } + + Ok(()) +} + +fn validate_cron_part(expr: &str, part: &str, name: &str, min: u32, max: u32) -> DurableResult<()> { + // Handle step values (e.g., "*/5" or "1-30/5") + let (range_part, step) = if let Some((range, step_str)) = part.split_once('/') { + let step: u32 = step_str + .parse() + .map_err(|_| DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("invalid step value '{step_str}' in {name} field"), + })?; + if step == 0 { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("step value cannot be 0 in {name} field"), + }); + } + (range, Some(step)) + } else { + (part, None) + }; + + // Handle wildcard + if range_part == "*" { + return Ok(()); + } + + // Handle range (e.g., "1-30") + if let Some((start_str, end_str)) = range_part.split_once('-') { + let start: u32 = start_str + .parse() + .map_err(|_| DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("invalid range start '{start_str}' in {name} field"), + })?; + let end: u32 = end_str + .parse() + .map_err(|_| DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("invalid range end '{end_str}' in {name} field"), + })?; + + if start < min || start > max { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("{name} range start {start} out of range {min}-{max}"), + }); + } + if end < min || end > max { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("{name} range end {end} out of range {min}-{max}"), + }); + } + if start > end { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("{name} range start {start} is greater than end {end}"), + }); + } + + return Ok(()); + } + + // Handle single value + if step.is_some() && range_part == "*" { + return Ok(()); + } + + let value: u32 = range_part + .parse() + .map_err(|_| DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("invalid value '{range_part}' in {name} field"), + })?; + + if value < min || value > max { + return Err(DurableError::InvalidCronExpression { + expression: expr.to_string(), + reason: format!("{name} value {value} out of range {min}-{max}"), + }); + } + + Ok(()) +} + +/// Validate a schedule name (alphanumeric, hyphens, underscores only; non-empty). +fn validate_schedule_name(name: &str) -> DurableResult<()> { + if name.is_empty() { + return Err(DurableError::InvalidCronExpression { + expression: String::new(), + reason: "schedule name cannot be empty".to_string(), + }); + } + + if !name + .chars() + .all(|c| c.is_alphanumeric() || c == '-' || c == '_') + { + return Err(DurableError::InvalidCronExpression { + expression: String::new(), + reason: format!( + "schedule name '{name}' contains invalid characters (only alphanumeric, hyphens, and underscores allowed)" + ), + }); + } + + Ok(()) +} + +// --- SQL escaping --- + +/// Dollar-quote a string using `$durable$` as the delimiter. +/// Falls back to escaped single quotes if the content contains `$durable$`. +fn pg_literal(s: &str) -> String { + if !s.contains("$durable$") { + format!("$durable${s}$durable$") + } else { + // Fallback: single-quote escaping (double up any single quotes) + format!("'{}'", s.replace('\'', "''")) + } +} + +/// Build the SQL command that pg_cron will execute to spawn a task. +fn build_pgcron_spawn_sql( + queue_name: &str, + task_name: &str, + params: &JsonValue, + spawn_options: &JsonValue, +) -> String { + let params_str = params.to_string(); + let options_str = spawn_options.to_string(); + + format!( + "SELECT durable.spawn_task({}, {}, {}::jsonb, {}::jsonb)", + pg_literal(queue_name), + pg_literal(task_name), + pg_literal(¶ms_str), + pg_literal(&options_str), + ) +} + +/// Map pgcron-related SQL errors to more descriptive error messages. +fn map_pgcron_error(err: sqlx::Error, operation: &str) -> DurableError { + let err_str = err.to_string(); + if err_str.contains("cron") || err_str.contains("pg_cron") { + DurableError::PgCronUnavailable { + reason: format!("pg_cron error during {operation}: {err_str}"), + } + } else { + DurableError::Database(err) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // --- Cron expression validation tests --- + + #[test] + fn test_valid_cron_every_minute() { + assert!(validate_cron_expression("* * * * *").is_ok()); + } + + #[test] + fn test_valid_cron_every_5_minutes() { + assert!(validate_cron_expression("*/5 * * * *").is_ok()); + } + + #[test] + fn test_valid_cron_range_with_step() { + assert!(validate_cron_expression("0-30/5 * * * *").is_ok()); + } + + #[test] + fn test_valid_cron_list() { + assert!(validate_cron_expression("1,15,30 * * * *").is_ok()); + } + + #[test] + fn test_valid_cron_specific_time() { + assert!(validate_cron_expression("0 9 * * 1").is_ok()); + } + + #[test] + fn test_valid_cron_complex() { + assert!(validate_cron_expression("30 2 1,15 * 0-5").is_ok()); + } + + #[test] + fn test_valid_cron_weekday_7() { + // 7 is valid for day-of-week (Sunday in some implementations) + assert!(validate_cron_expression("0 0 * * 7").is_ok()); + } + + #[test] + fn test_invalid_cron_too_few_fields() { + let result = validate_cron_expression("* * *"); + assert!(result.is_err()); + let err = result.unwrap_err(); + match err { + DurableError::InvalidCronExpression { reason, .. } => { + assert!(reason.contains("expected 5 fields")); + } + _ => panic!("unexpected error type"), + } + } + + #[test] + fn test_invalid_cron_too_many_fields() { + let result = validate_cron_expression("* * * * * *"); + assert!(result.is_err()); + let err = result.unwrap_err(); + match err { + DurableError::InvalidCronExpression { reason, .. } => { + assert!(reason.contains("expected 5 fields")); + } + _ => panic!("unexpected error type"), + } + } + + #[test] + fn test_invalid_cron_6_field_seconds() { + let result = validate_cron_expression("0 */5 * * * *"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_out_of_range_minute() { + let result = validate_cron_expression("60 * * * *"); + assert!(result.is_err()); + let err = result.unwrap_err(); + match err { + DurableError::InvalidCronExpression { reason, .. } => { + assert!(reason.contains("out of range")); + } + _ => panic!("unexpected error type"), + } + } + + #[test] + fn test_invalid_cron_out_of_range_hour() { + let result = validate_cron_expression("0 24 * * *"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_out_of_range_day() { + let result = validate_cron_expression("0 0 32 * *"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_out_of_range_month() { + let result = validate_cron_expression("0 0 * 13 *"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_out_of_range_weekday() { + let result = validate_cron_expression("0 0 * * 8"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_empty() { + let result = validate_cron_expression(""); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_letters() { + let result = validate_cron_expression("abc * * * *"); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_cron_zero_step() { + let result = validate_cron_expression("*/0 * * * *"); + assert!(result.is_err()); + let err = result.unwrap_err(); + match err { + DurableError::InvalidCronExpression { reason, .. } => { + assert!(reason.contains("step value cannot be 0")); + } + _ => panic!("unexpected error type"), + } + } + + #[test] + fn test_invalid_cron_reversed_range() { + let result = validate_cron_expression("30-10 * * * *"); + assert!(result.is_err()); + let err = result.unwrap_err(); + match err { + DurableError::InvalidCronExpression { reason, .. } => { + assert!(reason.contains("greater than end")); + } + _ => panic!("unexpected error type"), + } + } + + // --- pg_literal tests --- + + #[test] + fn test_pg_literal_simple() { + assert_eq!(pg_literal("hello"), "$durable$hello$durable$"); + } + + #[test] + fn test_pg_literal_with_single_quotes() { + assert_eq!(pg_literal("it's a test"), "$durable$it's a test$durable$"); + } + + #[test] + fn test_pg_literal_with_json() { + let json = r#"{"key": "value"}"#; + assert_eq!(pg_literal(json), format!("$durable${json}$durable$")); + } + + #[test] + fn test_pg_literal_fallback_on_delimiter_conflict() { + let content = "contains $durable$ in it"; + assert_eq!(pg_literal(content), "'contains $durable$ in it'"); + } + + #[test] + fn test_pg_literal_fallback_escapes_quotes() { + let content = "has $durable$ and 'quotes'"; + assert_eq!(pg_literal(content), "'has $durable$ and ''quotes'''"); + } + + // --- Schedule name validation tests --- + + #[test] + fn test_valid_schedule_names() { + assert!(validate_schedule_name("my-schedule").is_ok()); + assert!(validate_schedule_name("task_1").is_ok()); + assert!(validate_schedule_name("DailyReport").is_ok()); + assert!(validate_schedule_name("a").is_ok()); + assert!(validate_schedule_name("test-123_abc").is_ok()); + } + + #[test] + fn test_invalid_schedule_name_empty() { + assert!(validate_schedule_name("").is_err()); + } + + #[test] + fn test_invalid_schedule_name_spaces() { + assert!(validate_schedule_name("my schedule").is_err()); + } + + #[test] + fn test_invalid_schedule_name_semicolons() { + assert!(validate_schedule_name("drop;table").is_err()); + } + + #[test] + fn test_invalid_schedule_name_special_chars() { + assert!(validate_schedule_name("name@here").is_err()); + assert!(validate_schedule_name("name.here").is_err()); + assert!(validate_schedule_name("name/here").is_err()); + } + + // --- build_pgcron_spawn_sql tests --- + + #[test] + fn test_build_pgcron_spawn_sql() { + let params = serde_json::json!({"key": "value"}); + let options = serde_json::json!({"max_attempts": 3}); + let sql = build_pgcron_spawn_sql("my_queue", "my_task", ¶ms, &options); + assert!(sql.contains("durable.spawn_task")); + assert!(sql.contains("my_queue")); + assert!(sql.contains("my_task")); + assert!(sql.contains("::jsonb")); + } +} diff --git a/src/error.rs b/src/error.rs index 7c4a20d..e79bdbd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -437,6 +437,31 @@ pub enum DurableError { /// The unrecognized state string. state: String, }, + + /// pg_cron extension is not available in the database. + #[error("pg_cron is not available: {reason}")] + PgCronUnavailable { + /// Why pg_cron is not available. + reason: String, + }, + + /// Cron expression failed validation. + #[error("invalid cron expression '{expression}': {reason}")] + InvalidCronExpression { + /// The invalid cron expression. + expression: String, + /// Why the expression is invalid. + reason: String, + }, + + /// Schedule not found. + #[error("schedule '{schedule_name}' not found in queue '{queue_name}'")] + ScheduleNotFound { + /// The schedule name that was not found. + schedule_name: String, + /// The queue name that was searched. + queue_name: String, + }, } /// Result type alias for Client API operations. diff --git a/src/lib.rs b/src/lib.rs index eef201c..fed9b2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,7 @@ mod client; mod context; +mod cron; mod error; mod task; #[cfg(feature = "telemetry")] @@ -106,6 +107,7 @@ mod worker; // Re-export public API pub use client::{Durable, DurableBuilder}; pub use context::TaskContext; +pub use cron::{ScheduleFilter, ScheduleInfo, ScheduleOptions, setup_pgcron}; pub use error::{ControlFlow, DurableError, DurableResult, TaskError, TaskResult}; pub use task::{ErasedTask, Task, TaskWrapper}; pub use types::{ diff --git a/src/postgres/migrations/20260228000000_add_cron_schedules.sql b/src/postgres/migrations/20260228000000_add_cron_schedules.sql new file mode 100644 index 0000000..376a427 --- /dev/null +++ b/src/postgres/migrations/20260228000000_add_cron_schedules.sql @@ -0,0 +1,23 @@ +-- Cron schedule registry table. +-- Stores metadata for schedules managed by pg_cron via the Durable client API. +-- This table always exists (even without pg_cron installed), so list_schedules() works regardless. + +CREATE TABLE IF NOT EXISTS durable.cron_schedules ( + schedule_name TEXT NOT NULL, + queue_name TEXT NOT NULL, + task_name TEXT NOT NULL, + cron_expression TEXT NOT NULL, + params JSONB NOT NULL DEFAULT '{}'::jsonb, + spawn_options JSONB NOT NULL DEFAULT '{}'::jsonb, + metadata JSONB NOT NULL DEFAULT '{}'::jsonb, + pgcron_job_name TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (queue_name, schedule_name) +); + +CREATE INDEX IF NOT EXISTS idx_cron_schedules_metadata + ON durable.cron_schedules USING gin (metadata); + +CREATE INDEX IF NOT EXISTS idx_cron_schedules_task_name + ON durable.cron_schedules (queue_name, task_name); diff --git a/tests/cron_test.rs b/tests/cron_test.rs new file mode 100644 index 0000000..edca162 --- /dev/null +++ b/tests/cron_test.rs @@ -0,0 +1,489 @@ +#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)] + +mod common; + +use durable::{ + Durable, DurableError, MIGRATOR, ScheduleFilter, ScheduleOptions, SpawnOptions, setup_pgcron, +}; +use serde_json::json; +use sqlx::PgPool; +use std::collections::HashMap; + +/// Check if pg_cron is available in this database. Tests that require pg_cron +/// should call this and return early if it's not installed. +async fn pgcron_available(pool: &PgPool) -> bool { + let result: Result<(bool,), _> = + sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_cron')") + .fetch_one(pool) + .await; + match result { + Ok((exists,)) => exists, + Err(_) => false, + } +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_setup_pgcron(pool: PgPool) { + // If pg_cron extension is available, setup should succeed + // If not, it should return PgCronUnavailable + let result = setup_pgcron(&pool).await; + if pgcron_available(&pool).await { + result.unwrap(); + } else { + let err = result.unwrap_err(); + assert!(matches!(err, DurableError::PgCronUnavailable { .. })); + } +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_create_and_list_schedule(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_create_and_list_schedule: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_create_list") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let mut metadata = HashMap::new(); + metadata.insert("team".to_string(), json!("payments")); + metadata.insert("env".to_string(), json!("production")); + + let options = ScheduleOptions { + task_name: "process-payments".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({"batch_size": 100}), + spawn_options: SpawnOptions::default(), + metadata: Some(metadata), + }; + + durable + .create_schedule("payment-schedule", options) + .await + .unwrap(); + + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + + let schedule = &schedules[0]; + assert_eq!(schedule.name, "payment-schedule"); + assert_eq!(schedule.cron_expression, "*/5 * * * *"); + assert_eq!(schedule.task_name, "process-payments"); + assert_eq!(schedule.params, json!({"batch_size": 100})); + assert_eq!(schedule.metadata.get("team"), Some(&json!("payments"))); + assert_eq!(schedule.metadata.get("env"), Some(&json!("production"))); + assert_eq!( + schedule.pgcron_job_name, + "durable_test_cron_create_list_payment-schedule" + ); + + // Cleanup + durable.delete_schedule("payment-schedule").await.unwrap(); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_create_schedule_upsert(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_create_schedule_upsert: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_upsert") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + // Create initial schedule + let options = ScheduleOptions { + task_name: "task-v1".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({"version": 1}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("my-schedule", options) + .await + .unwrap(); + + // Update with same name + let options2 = ScheduleOptions { + task_name: "task-v2".to_string(), + cron_expression: "*/10 * * * *".to_string(), + params: json!({"version": 2}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("my-schedule", options2) + .await + .unwrap(); + + // Should still be just 1 schedule, but updated + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].task_name, "task-v2"); + assert_eq!(schedules[0].cron_expression, "*/10 * * * *"); + assert_eq!(schedules[0].params, json!({"version": 2})); + + // Cleanup + durable.delete_schedule("my-schedule").await.unwrap(); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_delete_schedule(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_delete_schedule: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_delete") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options = ScheduleOptions { + task_name: "cleanup-task".to_string(), + cron_expression: "0 0 * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("daily-cleanup", options) + .await + .unwrap(); + + // Verify it exists + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + + // Delete it + durable.delete_schedule("daily-cleanup").await.unwrap(); + + // Verify it's gone + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 0); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_delete_nonexistent_schedule(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_delete_nonexistent_schedule: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_delete_missing") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let result = durable.delete_schedule("nonexistent").await; + assert!(result.is_err()); + match result.unwrap_err() { + DurableError::ScheduleNotFound { + schedule_name, + queue_name, + } => { + assert_eq!(schedule_name, "nonexistent"); + assert_eq!(queue_name, "test_cron_delete_missing"); + } + other => panic!("expected ScheduleNotFound, got: {other:?}"), + } +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_create_schedule_invalid_cron(pool: PgPool) { + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_invalid") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options = ScheduleOptions { + task_name: "my-task".to_string(), + cron_expression: "invalid cron".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + + let result = durable.create_schedule("bad-cron", options).await; + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + DurableError::InvalidCronExpression { .. } + )); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_schedule_injects_metadata_headers(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_schedule_injects_metadata_headers: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_headers") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options = ScheduleOptions { + task_name: "header-task".to_string(), + cron_expression: "0 0 * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("header-test", options) + .await + .unwrap(); + + // Verify the spawn_options in the registry contain the durable:: headers + let schedules = durable.list_schedules(None).await.unwrap(); + assert_eq!(schedules.len(), 1); + + let spawn_opts = &schedules[0].spawn_options; + let headers = spawn_opts.get("headers").expect("should have headers"); + assert_eq!( + headers.get("durable::scheduled_by"), + Some(&json!("header-test")) + ); + assert_eq!(headers.get("durable::cron"), Some(&json!("0 0 * * *"))); + + // Cleanup + durable.delete_schedule("header-test").await.unwrap(); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_list_schedules_filter_by_metadata(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_list_schedules_filter_by_metadata: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_filter_meta") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + // Create schedules with different metadata + let mut meta_payments = HashMap::new(); + meta_payments.insert("team".to_string(), json!("payments")); + + let mut meta_billing = HashMap::new(); + meta_billing.insert("team".to_string(), json!("billing")); + + let options1 = ScheduleOptions { + task_name: "task-a".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_payments.clone()), + }; + durable + .create_schedule("schedule-a", options1) + .await + .unwrap(); + + let options2 = ScheduleOptions { + task_name: "task-b".to_string(), + cron_expression: "*/10 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_billing), + }; + durable + .create_schedule("schedule-b", options2) + .await + .unwrap(); + + // Filter by payments team + let filter = ScheduleFilter { + metadata: Some(meta_payments), + ..Default::default() + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].name, "schedule-a"); + + // No filter returns both + let all = durable.list_schedules(None).await.unwrap(); + assert_eq!(all.len(), 2); + + // Cleanup + durable.delete_schedule("schedule-a").await.unwrap(); + durable.delete_schedule("schedule-b").await.unwrap(); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_list_schedules_filter_by_task_name(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_list_schedules_filter_by_task_name: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_filter_task") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let options1 = ScheduleOptions { + task_name: "process-orders".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable.create_schedule("orders-1", options1).await.unwrap(); + + let options2 = ScheduleOptions { + task_name: "process-orders".to_string(), + cron_expression: "*/15 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable.create_schedule("orders-2", options2).await.unwrap(); + + let options3 = ScheduleOptions { + task_name: "send-reports".to_string(), + cron_expression: "0 9 * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable.create_schedule("reports", options3).await.unwrap(); + + // Filter by task name + let filter = ScheduleFilter { + task_name: Some("process-orders".to_string()), + ..Default::default() + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 2); + assert!(schedules.iter().all(|s| s.task_name == "process-orders")); + + // Filter by different task name + let filter = ScheduleFilter { + task_name: Some("send-reports".to_string()), + ..Default::default() + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].name, "reports"); + + // Cleanup + durable.delete_schedule("orders-1").await.unwrap(); + durable.delete_schedule("orders-2").await.unwrap(); + durable.delete_schedule("reports").await.unwrap(); +} + +#[sqlx::test(migrator = "MIGRATOR")] +async fn test_list_schedules_combined_filter(pool: PgPool) { + if !pgcron_available(&pool).await { + eprintln!("skipping test_list_schedules_combined_filter: pg_cron not available"); + return; + } + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name("test_cron_filter_combo") + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + let mut meta_payments = HashMap::new(); + meta_payments.insert("team".to_string(), json!("payments")); + + let mut meta_billing = HashMap::new(); + meta_billing.insert("team".to_string(), json!("billing")); + + // Same task, different metadata + let options1 = ScheduleOptions { + task_name: "process-data".to_string(), + cron_expression: "*/5 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_payments.clone()), + }; + durable + .create_schedule("data-payments", options1) + .await + .unwrap(); + + let options2 = ScheduleOptions { + task_name: "process-data".to_string(), + cron_expression: "*/10 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_billing.clone()), + }; + durable + .create_schedule("data-billing", options2) + .await + .unwrap(); + + // Different task, same metadata + let options3 = ScheduleOptions { + task_name: "send-alerts".to_string(), + cron_expression: "0 * * * *".to_string(), + params: json!({}), + spawn_options: SpawnOptions::default(), + metadata: Some(meta_payments.clone()), + }; + durable + .create_schedule("alerts-payments", options3) + .await + .unwrap(); + + // Filter by task + metadata + let filter = ScheduleFilter { + task_name: Some("process-data".to_string()), + metadata: Some(meta_payments), + }; + let schedules = durable.list_schedules(Some(filter)).await.unwrap(); + assert_eq!(schedules.len(), 1); + assert_eq!(schedules[0].name, "data-payments"); + + // Cleanup + durable.delete_schedule("data-payments").await.unwrap(); + durable.delete_schedule("data-billing").await.unwrap(); + durable.delete_schedule("alerts-payments").await.unwrap(); +} From 45fbd3b44448fa2689c95fb1ea74d9db6e44997a Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Sat, 28 Feb 2026 15:33:38 +0000 Subject: [PATCH 02/13] fixed schema file --- sql/schema.sql | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sql/schema.sql b/sql/schema.sql index 33917f7..ead541e 100644 --- a/sql/schema.sql +++ b/sql/schema.sql @@ -58,6 +58,26 @@ create table if not exists durable.queues ( created_at timestamptz not null default durable.current_time() ); +create table if not exists durable.cron_schedules ( + schedule_name text not null, + queue_name text not null, + task_name text not null, + cron_expression text not null, + params jsonb not null default '{}'::jsonb, + spawn_options jsonb not null default '{}'::jsonb, + metadata jsonb not null default '{}'::jsonb, + pgcron_job_name text not null, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now(), + primary key (queue_name, schedule_name) +); + +create index if not exists idx_cron_schedules_metadata + on durable.cron_schedules using gin (metadata); + +create index if not exists idx_cron_schedules_task_name + on durable.cron_schedules (queue_name, task_name); + create function durable.ensure_queue_tables (p_queue_name text) returns void language plpgsql From 67d5cbcda7fcf55390ad46a0ccc8ec2653816bb9 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Sat, 28 Feb 2026 16:01:41 +0000 Subject: [PATCH 03/13] fixes --- sql/schema.sql | 25 ++++++++ src/client.rs | 4 ++ src/cron.rs | 57 +++++++++++-------- src/error.rs | 13 ++++- .../20260228000000_add_cron_schedules.sql | 51 +++++++++++++++++ 5 files changed, 125 insertions(+), 25 deletions(-) diff --git a/sql/schema.sql b/sql/schema.sql index ead541e..4216663 100644 --- a/sql/schema.sql +++ b/sql/schema.sql @@ -272,12 +272,15 @@ end; $$; -- Drop a queue if it exists. +-- Also cleans up any cron schedules and their pg_cron jobs for the queue. create function durable.drop_queue (p_queue_name text) returns void language plpgsql as $$ declare v_existing_queue text; + v_rec record; + v_jobid bigint; begin select queue_name into v_existing_queue from durable.queues @@ -287,6 +290,28 @@ begin return; end if; + -- Clean up any cron schedules associated with this queue + for v_rec in + select pgcron_job_name + from durable.cron_schedules + where queue_name = p_queue_name + loop + begin + select jobid into v_jobid + from cron.job + where jobname = v_rec.pgcron_job_name; + + if v_jobid is not null then + perform cron.unschedule(v_jobid); + end if; + exception when others then + -- pg_cron may not be installed; ignore errors + null; + end; + end loop; + + delete from durable.cron_schedules where queue_name = p_queue_name; + execute format('drop table if exists durable.%I cascade', 'w_' || p_queue_name); execute format('drop table if exists durable.%I cascade', 'e_' || p_queue_name); execute format('drop table if exists durable.%I cascade', 'c_' || p_queue_name); diff --git a/src/client.rs b/src/client.rs index c789a7b..0fae58d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -334,6 +334,10 @@ where &self.state } + pub(crate) fn spawn_defaults(&self) -> &SpawnDefaults { + &self.spawn_defaults + } + /// Register a task type. Required before spawning or processing. /// /// Returns an error if a task with the same name is already registered. diff --git a/src/cron.rs b/src/cron.rs index efaf18d..185b229 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -135,7 +135,18 @@ where JsonValue::String(options.cron_expression.clone()), ); - let max_attempts = spawn_options.max_attempts.unwrap_or(5); + let max_attempts = spawn_options + .max_attempts + .unwrap_or(self.spawn_defaults().max_attempts); + let spawn_options = SpawnOptions { + retry_strategy: spawn_options + .retry_strategy + .or_else(|| self.spawn_defaults().retry_strategy.clone()), + cancellation: spawn_options + .cancellation + .or_else(|| self.spawn_defaults().cancellation.clone()), + ..spawn_options + }; let db_options = Self::serialize_spawn_options(&spawn_options, max_attempts) .map_err(DurableError::Serialization)?; @@ -388,12 +399,12 @@ fn validate_cron_field( fn validate_cron_part(expr: &str, part: &str, name: &str, min: u32, max: u32) -> DurableResult<()> { // Handle step values (e.g., "*/5" or "1-30/5") - let (range_part, step) = if let Some((range, step_str)) = part.split_once('/') { + let (range_part, _step) = if let Some((range, step_str)) = part.split_once('/') { let step: u32 = step_str .parse() .map_err(|_| DurableError::InvalidCronExpression { expression: expr.to_string(), - reason: format!("invalid step value '{step_str}' in {name} field"), + reason: format!("invalid step value `{step_str}` in {name} field"), })?; if step == 0 { return Err(DurableError::InvalidCronExpression { @@ -417,13 +428,13 @@ fn validate_cron_part(expr: &str, part: &str, name: &str, min: u32, max: u32) -> .parse() .map_err(|_| DurableError::InvalidCronExpression { expression: expr.to_string(), - reason: format!("invalid range start '{start_str}' in {name} field"), + reason: format!("invalid range start `{start_str}` in {name} field"), })?; let end: u32 = end_str .parse() .map_err(|_| DurableError::InvalidCronExpression { expression: expr.to_string(), - reason: format!("invalid range end '{end_str}' in {name} field"), + reason: format!("invalid range end `{end_str}` in {name} field"), })?; if start < min || start > max { @@ -449,15 +460,11 @@ fn validate_cron_part(expr: &str, part: &str, name: &str, min: u32, max: u32) -> } // Handle single value - if step.is_some() && range_part == "*" { - return Ok(()); - } - let value: u32 = range_part .parse() .map_err(|_| DurableError::InvalidCronExpression { expression: expr.to_string(), - reason: format!("invalid value '{range_part}' in {name} field"), + reason: format!("invalid value `{range_part}` in {name} field"), })?; if value < min || value > max { @@ -473,8 +480,8 @@ fn validate_cron_part(expr: &str, part: &str, name: &str, min: u32, max: u32) -> /// Validate a schedule name (alphanumeric, hyphens, underscores only; non-empty). fn validate_schedule_name(name: &str) -> DurableResult<()> { if name.is_empty() { - return Err(DurableError::InvalidCronExpression { - expression: String::new(), + return Err(DurableError::InvalidScheduleName { + name: String::new(), reason: "schedule name cannot be empty".to_string(), }); } @@ -483,11 +490,9 @@ fn validate_schedule_name(name: &str) -> DurableResult<()> { .chars() .all(|c| c.is_alphanumeric() || c == '-' || c == '_') { - return Err(DurableError::InvalidCronExpression { - expression: String::new(), - reason: format!( - "schedule name '{name}' contains invalid characters (only alphanumeric, hyphens, and underscores allowed)" - ), + return Err(DurableError::InvalidScheduleName { + name: name.to_string(), + reason: "contains invalid characters (only alphanumeric, hyphens, and underscores allowed)".to_string(), }); } @@ -730,24 +735,30 @@ mod tests { #[test] fn test_invalid_schedule_name_empty() { - assert!(validate_schedule_name("").is_err()); + let err = validate_schedule_name("").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); } #[test] fn test_invalid_schedule_name_spaces() { - assert!(validate_schedule_name("my schedule").is_err()); + let err = validate_schedule_name("my schedule").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); } #[test] fn test_invalid_schedule_name_semicolons() { - assert!(validate_schedule_name("drop;table").is_err()); + let err = validate_schedule_name("drop;table").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); } #[test] fn test_invalid_schedule_name_special_chars() { - assert!(validate_schedule_name("name@here").is_err()); - assert!(validate_schedule_name("name.here").is_err()); - assert!(validate_schedule_name("name/here").is_err()); + let err = validate_schedule_name("name@here").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + let err = validate_schedule_name("name.here").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); + let err = validate_schedule_name("name/here").unwrap_err(); + assert!(matches!(err, DurableError::InvalidScheduleName { .. })); } // --- build_pgcron_spawn_sql tests --- diff --git a/src/error.rs b/src/error.rs index e79bdbd..06392c2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -446,7 +446,7 @@ pub enum DurableError { }, /// Cron expression failed validation. - #[error("invalid cron expression '{expression}': {reason}")] + #[error("invalid cron expression `{expression}`: {reason}")] InvalidCronExpression { /// The invalid cron expression. expression: String, @@ -454,8 +454,17 @@ pub enum DurableError { reason: String, }, + /// Schedule name failed validation. + #[error("invalid schedule name `{name}`: {reason}")] + InvalidScheduleName { + /// The invalid schedule name. + name: String, + /// Why the name is invalid. + reason: String, + }, + /// Schedule not found. - #[error("schedule '{schedule_name}' not found in queue '{queue_name}'")] + #[error("schedule `{schedule_name}` not found in queue `{queue_name}`")] ScheduleNotFound { /// The schedule name that was not found. schedule_name: String, diff --git a/src/postgres/migrations/20260228000000_add_cron_schedules.sql b/src/postgres/migrations/20260228000000_add_cron_schedules.sql index 376a427..a384bb2 100644 --- a/src/postgres/migrations/20260228000000_add_cron_schedules.sql +++ b/src/postgres/migrations/20260228000000_add_cron_schedules.sql @@ -21,3 +21,54 @@ CREATE INDEX IF NOT EXISTS idx_cron_schedules_metadata CREATE INDEX IF NOT EXISTS idx_cron_schedules_task_name ON durable.cron_schedules (queue_name, task_name); + +-- Override drop_queue to clean up cron schedules and their pg_cron jobs. +CREATE OR REPLACE FUNCTION durable.drop_queue (p_queue_name text) + returns void + language plpgsql +as $$ +declare + v_existing_queue text; + v_rec record; + v_jobid bigint; +begin + select queue_name into v_existing_queue + from durable.queues + where queue_name = p_queue_name; + + if v_existing_queue is null then + return; + end if; + + -- Clean up any cron schedules associated with this queue + for v_rec in + select pgcron_job_name + from durable.cron_schedules + where queue_name = p_queue_name + loop + begin + select jobid into v_jobid + from cron.job + where jobname = v_rec.pgcron_job_name; + + if v_jobid is not null then + perform cron.unschedule(v_jobid); + end if; + exception when others then + -- pg_cron may not be installed; ignore errors + null; + end; + end loop; + + delete from durable.cron_schedules where queue_name = p_queue_name; + + -- Existing drop logic + execute format('drop table if exists durable.%I cascade', 'w_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 'e_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 'c_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 'r_' || p_queue_name); + execute format('drop table if exists durable.%I cascade', 't_' || p_queue_name); + + delete from durable.queues where queue_name = p_queue_name; +end; +$$; From 521d75708d3e39a01fe8913e4d33ea2bebe19384 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Sat, 28 Feb 2026 14:37:28 -0500 Subject: [PATCH 04/13] clean up PR --- src/cron.rs | 6 ++++-- .../migrations/20260228000000_add_cron_schedules.sql | 1 - tests/cron_test.rs | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/cron.rs b/src/cron.rs index 185b229..6b6eb7d 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -121,7 +121,7 @@ where validate_cron_expression(&options.cron_expression)?; validate_headers(&options.spawn_options.headers)?; - let pgcron_job_name = format!("durable_{}_{}", self.queue_name(), schedule_name); + let pgcron_job_name = format!("durable::{}::{}", self.queue_name(), schedule_name); // Build spawn options with injected durable:: headers let mut spawn_options = options.spawn_options.clone(); @@ -492,7 +492,9 @@ fn validate_schedule_name(name: &str) -> DurableResult<()> { { return Err(DurableError::InvalidScheduleName { name: name.to_string(), - reason: "contains invalid characters (only alphanumeric, hyphens, and underscores allowed)".to_string(), + reason: + "contains invalid characters (only alphanumeric, hyphens, and underscores allowed)" + .to_string(), }); } diff --git a/src/postgres/migrations/20260228000000_add_cron_schedules.sql b/src/postgres/migrations/20260228000000_add_cron_schedules.sql index a384bb2..bd4f92a 100644 --- a/src/postgres/migrations/20260228000000_add_cron_schedules.sql +++ b/src/postgres/migrations/20260228000000_add_cron_schedules.sql @@ -62,7 +62,6 @@ begin delete from durable.cron_schedules where queue_name = p_queue_name; - -- Existing drop logic execute format('drop table if exists durable.%I cascade', 'w_' || p_queue_name); execute format('drop table if exists durable.%I cascade', 'e_' || p_queue_name); execute format('drop table if exists durable.%I cascade', 'c_' || p_queue_name); diff --git a/tests/cron_test.rs b/tests/cron_test.rs index edca162..856490e 100644 --- a/tests/cron_test.rs +++ b/tests/cron_test.rs @@ -80,7 +80,7 @@ async fn test_create_and_list_schedule(pool: PgPool) { assert_eq!(schedule.metadata.get("env"), Some(&json!("production"))); assert_eq!( schedule.pgcron_job_name, - "durable_test_cron_create_list_payment-schedule" + "durable::test_cron_create_list::payment-schedule" ); // Cleanup From d6dbf75815b215963ac1a73946a6b24be98d399a Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Sun, 1 Mar 2026 09:42:23 -0500 Subject: [PATCH 05/13] fix clippies --- src/cron.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cron.rs b/src/cron.rs index 6b6eb7d..0ccf95e 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -546,6 +546,7 @@ fn map_pgcron_error(err: sqlx::Error, operation: &str) -> DurableError { } #[cfg(test)] +#[expect(clippy::unwrap_used, clippy::panic)] mod tests { use super::*; From c468c4666c9fda887ccd1236ee2f42e525df3863 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Sun, 1 Mar 2026 09:51:01 -0500 Subject: [PATCH 06/13] updated readme with interface to cron --- README.md | 107 ++++++++++++++++++ sql/schema.sql | 4 +- src/cron.rs | 4 +- .../20260228000000_add_cron_schedules.sql | 4 +- 4 files changed, 113 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 2d6e5e6..320c069 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Use at your own risk. - **Await events** - Pause until external events arrive (with optional timeouts) - **Retry on failure** - Configurable retry strategies with exponential backoff - **Scale horizontally** - Multiple workers can process tasks concurrently +- **Cron scheduling** - Run tasks on recurring schedules via pg_cron integration Unlike exception-based durable execution systems (Python, TypeScript), this SDK uses Rust's `Result` type for suspension control flow, making it idiomatic and type-safe. @@ -260,6 +261,101 @@ client.emit_event( ).await?; ``` +### Cron Scheduling + +Schedule tasks to run on a recurring basis using [pg_cron](https://github.com/citusdata/pg_cron). Durable manages the pg_cron jobs and maintains a registry for discovery and filtering. + +**Setup** - Enable pg_cron once at startup: + +```rust +use durable::setup_pgcron; + +// Enable the pg_cron extension (requires superuser or appropriate privileges) +setup_pgcron(client.pool()).await?; +``` + +**Create a schedule:** + +```rust +use durable::ScheduleOptions; + +let options = ScheduleOptions { + task_name: "process-payments".to_string(), + cron_expression: "*/5 * * * *".to_string(), // Every 5 minutes + params: json!({"batch_size": 100}), + spawn_options: SpawnOptions::default(), + metadata: Some(HashMap::from([ + ("team".to_string(), json!("payments")), + ("env".to_string(), json!("production")), + ])), +}; + +// Creates or updates the schedule (upsert semantics) +client.create_schedule("payment-schedule", options).await?; +``` + +**List and filter schedules:** + +```rust +use durable::ScheduleFilter; + +// List all schedules on this queue +let all = client.list_schedules(None).await?; + +// Filter by task name +let filter = ScheduleFilter { + task_name: Some("process-payments".to_string()), + ..Default::default() +}; +let filtered = client.list_schedules(Some(filter)).await?; + +// Filter by metadata (JSONB containment) +let filter = ScheduleFilter { + metadata: Some(HashMap::from([("team".to_string(), json!("payments"))])), + ..Default::default() +}; +let filtered = client.list_schedules(Some(filter)).await?; +``` + +**Delete a schedule:** + +```rust +// Removes the schedule and its pg_cron job. In-flight tasks are not cancelled. +client.delete_schedule("payment-schedule").await?; +``` + +**Key behaviors:** + +- **pg_cron integration** - Schedules are backed by PostgreSQL's pg_cron extension. At each tick, pg_cron inserts a task into the queue via `durable.spawn_task()`, and workers pick it up normally. +- **Upsert semantics** - Calling `create_schedule` with an existing name updates the schedule in place. +- **Origin tracking** - Scheduled tasks automatically receive `durable::scheduled_by` and `durable::cron` headers, so tasks can identify how they were spawned. +- **Metadata filtering** - Attach arbitrary JSON metadata to schedules and filter with JSONB containment queries. +- **Queue cleanup** - Dropping a queue automatically unschedules all its cron jobs. + +### Polling Task Results + +You can poll for the result of a spawned task without running a worker in the same process: + +```rust +use durable::TaskStatus; + +let spawned = client.spawn::(params).await?; + +// Poll for the result +let result = client.get_task_result(spawned.task_id).await?; + +if let Some(poll) = result { + match poll.status { + TaskStatus::Completed => println!("Output: {:?}", poll.output), + TaskStatus::Failed => println!("Error: {:?}", poll.error), + TaskStatus::Pending | TaskStatus::Running | TaskStatus::Sleeping => { + println!("Still in progress...") + } + TaskStatus::Cancelled => println!("Task was cancelled"), + } +} +``` + ### Transactional Spawning You can atomically enqueue a task as part of a larger database transaction. This ensures that either both your write and the task spawn succeed, or neither does: @@ -318,11 +414,22 @@ This is useful when you need to guarantee that a task is only enqueued if relate | [`RetryStrategy`] | Retry behavior: `None`, `Fixed`, or `Exponential` | | [`CancellationPolicy`] | Auto-cancel tasks based on delay or duration | +### Cron Scheduling + +| Type | Description | +|------|-------------| +| [`ScheduleOptions`] | Options for creating a cron schedule (task, expression, params, metadata) | +| [`ScheduleInfo`] | Information about an existing schedule | +| [`ScheduleFilter`] | Filter for listing schedules (by task name or metadata) | +| [`setup_pgcron()`] | Initialize the pg_cron extension | + ### Results | Type | Description | |------|-------------| | [`SpawnResult`] | Returned when spawning a task (task_id, run_id, attempt) | +| [`TaskPollResult`] | Result of polling a task (status, output, error) | +| [`TaskStatus`] | Task state: `Pending`, `Running`, `Sleeping`, `Completed`, `Failed`, `Cancelled` | | [`ControlFlow`] | Signals for suspension and cancellation | ## Environment Variables diff --git a/sql/schema.sql b/sql/schema.sql index 4216663..fc678bc 100644 --- a/sql/schema.sql +++ b/sql/schema.sql @@ -67,8 +67,8 @@ create table if not exists durable.cron_schedules ( spawn_options jsonb not null default '{}'::jsonb, metadata jsonb not null default '{}'::jsonb, pgcron_job_name text not null, - created_at timestamptz not null default now(), - updated_at timestamptz not null default now(), + created_at timestamptz not null default durable.current_time(), + updated_at timestamptz not null default durable.current_time(), primary key (queue_name, schedule_name) ); diff --git a/src/cron.rs b/src/cron.rs index 0ccf95e..7dd5120 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -191,7 +191,7 @@ where sqlx::query( "INSERT INTO durable.cron_schedules (schedule_name, queue_name, task_name, cron_expression, params, spawn_options, metadata, pgcron_job_name, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, now(), now()) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, durable.current_time(), durable.current_time()) ON CONFLICT (queue_name, schedule_name) DO UPDATE SET task_name = EXCLUDED.task_name, @@ -200,7 +200,7 @@ where spawn_options = EXCLUDED.spawn_options, metadata = EXCLUDED.metadata, pgcron_job_name = EXCLUDED.pgcron_job_name, - updated_at = now()" + updated_at = durable.current_time()" ) .bind(schedule_name) .bind(self.queue_name()) diff --git a/src/postgres/migrations/20260228000000_add_cron_schedules.sql b/src/postgres/migrations/20260228000000_add_cron_schedules.sql index bd4f92a..223b0fd 100644 --- a/src/postgres/migrations/20260228000000_add_cron_schedules.sql +++ b/src/postgres/migrations/20260228000000_add_cron_schedules.sql @@ -11,8 +11,8 @@ CREATE TABLE IF NOT EXISTS durable.cron_schedules ( spawn_options JSONB NOT NULL DEFAULT '{}'::jsonb, metadata JSONB NOT NULL DEFAULT '{}'::jsonb, pgcron_job_name TEXT NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + created_at TIMESTAMPTZ NOT NULL DEFAULT durable.current_time(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT durable.current_time(), PRIMARY KEY (queue_name, schedule_name) ); From 5f458602b3fe0149e9dc079b6ac62338377b77bb Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Sun, 1 Mar 2026 12:53:15 -0500 Subject: [PATCH 07/13] reject names that contain $durable --- src/cron.rs | 55 +++++++++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/src/cron.rs b/src/cron.rs index 7dd5120..aeff843 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -156,7 +156,7 @@ where &options.task_name, &options.params, &db_options, - ); + )?; let metadata_value = match &options.metadata { Some(m) => serde_json::to_value(m).map_err(DurableError::Serialization)?, @@ -504,14 +504,16 @@ fn validate_schedule_name(name: &str) -> DurableResult<()> { // --- SQL escaping --- /// Dollar-quote a string using `$durable$` as the delimiter. -/// Falls back to escaped single quotes if the content contains `$durable$`. -fn pg_literal(s: &str) -> String { - if !s.contains("$durable$") { - format!("$durable${s}$durable$") - } else { - // Fallback: single-quote escaping (double up any single quotes) - format!("'{}'", s.replace('\'', "''")) +/// Returns an error if the content contains `$durable`, which would break the delimiter. +fn pg_literal(s: &str) -> Result { + if s.contains("$durable") { + return Err(DurableError::InvalidConfiguration { + reason: format!( + "string contains reserved delimiter sequence '$durable': {s}" + ), + }); } + Ok(format!("$durable${s}$durable$")) } /// Build the SQL command that pg_cron will execute to spawn a task. @@ -520,17 +522,17 @@ fn build_pgcron_spawn_sql( task_name: &str, params: &JsonValue, spawn_options: &JsonValue, -) -> String { +) -> Result { let params_str = params.to_string(); let options_str = spawn_options.to_string(); - format!( + Ok(format!( "SELECT durable.spawn_task({}, {}, {}::jsonb, {}::jsonb)", - pg_literal(queue_name), - pg_literal(task_name), - pg_literal(¶ms_str), - pg_literal(&options_str), - ) + pg_literal(queue_name)?, + pg_literal(task_name)?, + pg_literal(¶ms_str)?, + pg_literal(&options_str)?, + )) } /// Map pgcron-related SQL errors to more descriptive error messages. @@ -699,30 +701,33 @@ mod tests { #[test] fn test_pg_literal_simple() { - assert_eq!(pg_literal("hello"), "$durable$hello$durable$"); + assert_eq!(pg_literal("hello").unwrap(), "$durable$hello$durable$"); } #[test] fn test_pg_literal_with_single_quotes() { - assert_eq!(pg_literal("it's a test"), "$durable$it's a test$durable$"); + assert_eq!( + pg_literal("it's a test").unwrap(), + "$durable$it's a test$durable$" + ); } #[test] fn test_pg_literal_with_json() { let json = r#"{"key": "value"}"#; - assert_eq!(pg_literal(json), format!("$durable${json}$durable$")); + assert_eq!(pg_literal(json).unwrap(), format!("$durable${json}$durable$")); } #[test] - fn test_pg_literal_fallback_on_delimiter_conflict() { - let content = "contains $durable$ in it"; - assert_eq!(pg_literal(content), "'contains $durable$ in it'"); + fn test_pg_literal_rejects_delimiter() { + assert!(pg_literal("contains $durable$ in it").is_err()); } #[test] - fn test_pg_literal_fallback_escapes_quotes() { - let content = "has $durable$ and 'quotes'"; - assert_eq!(pg_literal(content), "'has $durable$ and ''quotes'''"); + fn test_pg_literal_rejects_partial_delimiter() { + assert!(pg_literal("test$durable").is_err()); + assert!(pg_literal("$durablefoo").is_err()); + assert!(pg_literal("mid$durablemid").is_err()); } // --- Schedule name validation tests --- @@ -770,7 +775,7 @@ mod tests { fn test_build_pgcron_spawn_sql() { let params = serde_json::json!({"key": "value"}); let options = serde_json::json!({"max_attempts": 3}); - let sql = build_pgcron_spawn_sql("my_queue", "my_task", ¶ms, &options); + let sql = build_pgcron_spawn_sql("my_queue", "my_task", ¶ms, &options).unwrap(); assert!(sql.contains("durable.spawn_task")); assert!(sql.contains("my_queue")); assert!(sql.contains("my_task")); From 147f7149acbc191907e9ffaded9ce121c1b9b305 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Mon, 2 Mar 2026 10:35:21 -0500 Subject: [PATCH 08/13] fmtted --- src/cron.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/cron.rs b/src/cron.rs index aeff843..701098c 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -508,9 +508,7 @@ fn validate_schedule_name(name: &str) -> DurableResult<()> { fn pg_literal(s: &str) -> Result { if s.contains("$durable") { return Err(DurableError::InvalidConfiguration { - reason: format!( - "string contains reserved delimiter sequence '$durable': {s}" - ), + reason: format!("string contains reserved delimiter sequence '$durable': {s}"), }); } Ok(format!("$durable${s}$durable$")) @@ -715,7 +713,10 @@ mod tests { #[test] fn test_pg_literal_with_json() { let json = r#"{"key": "value"}"#; - assert_eq!(pg_literal(json).unwrap(), format!("$durable${json}$durable$")); + assert_eq!( + pg_literal(json).unwrap(), + format!("$durable${json}$durable$") + ); } #[test] From c22165557a7f26babea015f0f9e6875210d86862 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Mon, 2 Mar 2026 12:18:33 -0500 Subject: [PATCH 09/13] fixed PR comments --- .github/workflows/ci.yml | 18 +-- docker-compose.yml | 3 +- src/cron.rs | 302 ++------------------------------------- src/error.rs | 9 -- tests/cron_test.rs | 68 +-------- 5 files changed, 21 insertions(+), 379 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 439ab08..b80da00 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,24 +14,12 @@ jobs: ci: runs-on: ubuntu-latest - services: - postgres: - image: postgres:14-alpine - env: - POSTGRES_DB: test - POSTGRES_USER: postgres - POSTGRES_PASSWORD: postgres - ports: - - 5436:5432 - options: >- - --health-cmd "pg_isready -U postgres -d test" - --health-interval 10s - --health-timeout 5s - --health-retries 5 - steps: - uses: actions/checkout@v4 + - name: Start Postgres + run: docker compose up -d --wait + - name: Install uv uses: astral-sh/setup-uv@v4 diff --git a/docker-compose.yml b/docker-compose.yml index d14e472..481defa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,7 @@ services: postgres: - image: postgres:14-alpine + image: tensorzero/postgres-dev:14-trixie-slim + command: postgres -c shared_preload_libraries=pg_cron -c cron.database_name=test environment: POSTGRES_DB: test POSTGRES_USER: postgres diff --git a/src/cron.rs b/src/cron.rs index 701098c..e852c99 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -108,9 +108,9 @@ where /// /// Returns an error if: /// - The schedule name is invalid - /// - The cron expression is invalid /// - Headers use the reserved `durable::` prefix /// - pg_cron is not available + /// - The cron expression is rejected by pg_cron pub async fn create_schedule( &self, schedule_name: &str, @@ -118,9 +118,20 @@ where ) -> DurableResult<()> { // Validate inputs validate_schedule_name(schedule_name)?; - validate_cron_expression(&options.cron_expression)?; validate_headers(&options.spawn_options.headers)?; + // Check pg_cron availability early before doing any work + let exists: (bool,) = + sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_cron')") + .fetch_one(self.pool()) + .await?; + + if !exists.0 { + return Err(DurableError::PgCronUnavailable { + reason: "pg_cron extension is not installed".to_string(), + }); + } + let pgcron_job_name = format!("durable::{}::{}", self.queue_name(), schedule_name); // Build spawn options with injected durable:: headers @@ -163,18 +174,6 @@ where None => serde_json::json!({}), }; - // Check pg_cron availability - let exists: (bool,) = - sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_cron')") - .fetch_one(self.pool()) - .await?; - - if !exists.0 { - return Err(DurableError::PgCronUnavailable { - reason: "pg_cron extension is not installed".to_string(), - }); - } - // Execute in a transaction let mut tx = self.pool().begin().await?; @@ -349,134 +348,6 @@ struct ScheduleRow { // --- Validation helpers --- -/// Validate a 5-field standard cron expression. -fn validate_cron_expression(expr: &str) -> DurableResult<()> { - let fields: Vec<&str> = expr.split_whitespace().collect(); - if fields.len() != 5 { - return Err(DurableError::InvalidCronExpression { - expression: expr.to_string(), - reason: format!("expected 5 fields, got {}", fields.len()), - }); - } - - let field_names = ["minute", "hour", "day-of-month", "month", "day-of-week"]; - let field_ranges: [(u32, u32); 5] = [(0, 59), (0, 23), (1, 31), (1, 12), (0, 7)]; - - for (i, field) in fields.iter().enumerate() { - validate_cron_field( - expr, - field, - field_names[i], - field_ranges[i].0, - field_ranges[i].1, - )?; - } - - Ok(()) -} - -fn validate_cron_field( - expr: &str, - field: &str, - name: &str, - min: u32, - max: u32, -) -> DurableResult<()> { - if field.is_empty() { - return Err(DurableError::InvalidCronExpression { - expression: expr.to_string(), - reason: format!("{name} field is empty"), - }); - } - - // Split by comma for lists (e.g., "1,15,30") - for part in field.split(',') { - validate_cron_part(expr, part, name, min, max)?; - } - - Ok(()) -} - -fn validate_cron_part(expr: &str, part: &str, name: &str, min: u32, max: u32) -> DurableResult<()> { - // Handle step values (e.g., "*/5" or "1-30/5") - let (range_part, _step) = if let Some((range, step_str)) = part.split_once('/') { - let step: u32 = step_str - .parse() - .map_err(|_| DurableError::InvalidCronExpression { - expression: expr.to_string(), - reason: format!("invalid step value `{step_str}` in {name} field"), - })?; - if step == 0 { - return Err(DurableError::InvalidCronExpression { - expression: expr.to_string(), - reason: format!("step value cannot be 0 in {name} field"), - }); - } - (range, Some(step)) - } else { - (part, None) - }; - - // Handle wildcard - if range_part == "*" { - return Ok(()); - } - - // Handle range (e.g., "1-30") - if let Some((start_str, end_str)) = range_part.split_once('-') { - let start: u32 = start_str - .parse() - .map_err(|_| DurableError::InvalidCronExpression { - expression: expr.to_string(), - reason: format!("invalid range start `{start_str}` in {name} field"), - })?; - let end: u32 = end_str - .parse() - .map_err(|_| DurableError::InvalidCronExpression { - expression: expr.to_string(), - reason: format!("invalid range end `{end_str}` in {name} field"), - })?; - - if start < min || start > max { - return Err(DurableError::InvalidCronExpression { - expression: expr.to_string(), - reason: format!("{name} range start {start} out of range {min}-{max}"), - }); - } - if end < min || end > max { - return Err(DurableError::InvalidCronExpression { - expression: expr.to_string(), - reason: format!("{name} range end {end} out of range {min}-{max}"), - }); - } - if start > end { - return Err(DurableError::InvalidCronExpression { - expression: expr.to_string(), - reason: format!("{name} range start {start} is greater than end {end}"), - }); - } - - return Ok(()); - } - - // Handle single value - let value: u32 = range_part - .parse() - .map_err(|_| DurableError::InvalidCronExpression { - expression: expr.to_string(), - reason: format!("invalid value `{range_part}` in {name} field"), - })?; - - if value < min || value > max { - return Err(DurableError::InvalidCronExpression { - expression: expr.to_string(), - reason: format!("{name} value {value} out of range {min}-{max}"), - }); - } - - Ok(()) -} - /// Validate a schedule name (alphanumeric, hyphens, underscores only; non-empty). fn validate_schedule_name(name: &str) -> DurableResult<()> { if name.is_empty() { @@ -546,155 +417,10 @@ fn map_pgcron_error(err: sqlx::Error, operation: &str) -> DurableError { } #[cfg(test)] -#[expect(clippy::unwrap_used, clippy::panic)] +#[expect(clippy::unwrap_used)] mod tests { use super::*; - // --- Cron expression validation tests --- - - #[test] - fn test_valid_cron_every_minute() { - assert!(validate_cron_expression("* * * * *").is_ok()); - } - - #[test] - fn test_valid_cron_every_5_minutes() { - assert!(validate_cron_expression("*/5 * * * *").is_ok()); - } - - #[test] - fn test_valid_cron_range_with_step() { - assert!(validate_cron_expression("0-30/5 * * * *").is_ok()); - } - - #[test] - fn test_valid_cron_list() { - assert!(validate_cron_expression("1,15,30 * * * *").is_ok()); - } - - #[test] - fn test_valid_cron_specific_time() { - assert!(validate_cron_expression("0 9 * * 1").is_ok()); - } - - #[test] - fn test_valid_cron_complex() { - assert!(validate_cron_expression("30 2 1,15 * 0-5").is_ok()); - } - - #[test] - fn test_valid_cron_weekday_7() { - // 7 is valid for day-of-week (Sunday in some implementations) - assert!(validate_cron_expression("0 0 * * 7").is_ok()); - } - - #[test] - fn test_invalid_cron_too_few_fields() { - let result = validate_cron_expression("* * *"); - assert!(result.is_err()); - let err = result.unwrap_err(); - match err { - DurableError::InvalidCronExpression { reason, .. } => { - assert!(reason.contains("expected 5 fields")); - } - _ => panic!("unexpected error type"), - } - } - - #[test] - fn test_invalid_cron_too_many_fields() { - let result = validate_cron_expression("* * * * * *"); - assert!(result.is_err()); - let err = result.unwrap_err(); - match err { - DurableError::InvalidCronExpression { reason, .. } => { - assert!(reason.contains("expected 5 fields")); - } - _ => panic!("unexpected error type"), - } - } - - #[test] - fn test_invalid_cron_6_field_seconds() { - let result = validate_cron_expression("0 */5 * * * *"); - assert!(result.is_err()); - } - - #[test] - fn test_invalid_cron_out_of_range_minute() { - let result = validate_cron_expression("60 * * * *"); - assert!(result.is_err()); - let err = result.unwrap_err(); - match err { - DurableError::InvalidCronExpression { reason, .. } => { - assert!(reason.contains("out of range")); - } - _ => panic!("unexpected error type"), - } - } - - #[test] - fn test_invalid_cron_out_of_range_hour() { - let result = validate_cron_expression("0 24 * * *"); - assert!(result.is_err()); - } - - #[test] - fn test_invalid_cron_out_of_range_day() { - let result = validate_cron_expression("0 0 32 * *"); - assert!(result.is_err()); - } - - #[test] - fn test_invalid_cron_out_of_range_month() { - let result = validate_cron_expression("0 0 * 13 *"); - assert!(result.is_err()); - } - - #[test] - fn test_invalid_cron_out_of_range_weekday() { - let result = validate_cron_expression("0 0 * * 8"); - assert!(result.is_err()); - } - - #[test] - fn test_invalid_cron_empty() { - let result = validate_cron_expression(""); - assert!(result.is_err()); - } - - #[test] - fn test_invalid_cron_letters() { - let result = validate_cron_expression("abc * * * *"); - assert!(result.is_err()); - } - - #[test] - fn test_invalid_cron_zero_step() { - let result = validate_cron_expression("*/0 * * * *"); - assert!(result.is_err()); - let err = result.unwrap_err(); - match err { - DurableError::InvalidCronExpression { reason, .. } => { - assert!(reason.contains("step value cannot be 0")); - } - _ => panic!("unexpected error type"), - } - } - - #[test] - fn test_invalid_cron_reversed_range() { - let result = validate_cron_expression("30-10 * * * *"); - assert!(result.is_err()); - let err = result.unwrap_err(); - match err { - DurableError::InvalidCronExpression { reason, .. } => { - assert!(reason.contains("greater than end")); - } - _ => panic!("unexpected error type"), - } - } - // --- pg_literal tests --- #[test] diff --git a/src/error.rs b/src/error.rs index 06392c2..cd460ee 100644 --- a/src/error.rs +++ b/src/error.rs @@ -445,15 +445,6 @@ pub enum DurableError { reason: String, }, - /// Cron expression failed validation. - #[error("invalid cron expression `{expression}`: {reason}")] - InvalidCronExpression { - /// The invalid cron expression. - expression: String, - /// Why the expression is invalid. - reason: String, - }, - /// Schedule name failed validation. #[error("invalid schedule name `{name}`: {reason}")] InvalidScheduleName { diff --git a/tests/cron_test.rs b/tests/cron_test.rs index 856490e..f4ba30d 100644 --- a/tests/cron_test.rs +++ b/tests/cron_test.rs @@ -9,39 +9,13 @@ use serde_json::json; use sqlx::PgPool; use std::collections::HashMap; -/// Check if pg_cron is available in this database. Tests that require pg_cron -/// should call this and return early if it's not installed. -async fn pgcron_available(pool: &PgPool) -> bool { - let result: Result<(bool,), _> = - sqlx::query_as("SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_cron')") - .fetch_one(pool) - .await; - match result { - Ok((exists,)) => exists, - Err(_) => false, - } -} - #[sqlx::test(migrator = "MIGRATOR")] async fn test_setup_pgcron(pool: PgPool) { - // If pg_cron extension is available, setup should succeed - // If not, it should return PgCronUnavailable - let result = setup_pgcron(&pool).await; - if pgcron_available(&pool).await { - result.unwrap(); - } else { - let err = result.unwrap_err(); - assert!(matches!(err, DurableError::PgCronUnavailable { .. })); - } + setup_pgcron(&pool).await.unwrap(); } #[sqlx::test(migrator = "MIGRATOR")] async fn test_create_and_list_schedule(pool: PgPool) { - if !pgcron_available(&pool).await { - eprintln!("skipping test_create_and_list_schedule: pg_cron not available"); - return; - } - let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_create_list") @@ -89,11 +63,6 @@ async fn test_create_and_list_schedule(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_create_schedule_upsert(pool: PgPool) { - if !pgcron_available(&pool).await { - eprintln!("skipping test_create_schedule_upsert: pg_cron not available"); - return; - } - let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_upsert") @@ -142,11 +111,6 @@ async fn test_create_schedule_upsert(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_delete_schedule(pool: PgPool) { - if !pgcron_available(&pool).await { - eprintln!("skipping test_delete_schedule: pg_cron not available"); - return; - } - let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_delete") @@ -182,11 +146,6 @@ async fn test_delete_schedule(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_delete_nonexistent_schedule(pool: PgPool) { - if !pgcron_available(&pool).await { - eprintln!("skipping test_delete_nonexistent_schedule: pg_cron not available"); - return; - } - let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_delete_missing") @@ -229,21 +188,13 @@ async fn test_create_schedule_invalid_cron(pool: PgPool) { metadata: None, }; + // pg_cron validates the expression and the transaction should fail let result = durable.create_schedule("bad-cron", options).await; assert!(result.is_err()); - assert!(matches!( - result.unwrap_err(), - DurableError::InvalidCronExpression { .. } - )); } #[sqlx::test(migrator = "MIGRATOR")] async fn test_schedule_injects_metadata_headers(pool: PgPool) { - if !pgcron_available(&pool).await { - eprintln!("skipping test_schedule_injects_metadata_headers: pg_cron not available"); - return; - } - let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_headers") @@ -283,11 +234,6 @@ async fn test_schedule_injects_metadata_headers(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_list_schedules_filter_by_metadata(pool: PgPool) { - if !pgcron_available(&pool).await { - eprintln!("skipping test_list_schedules_filter_by_metadata: pg_cron not available"); - return; - } - let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_filter_meta") @@ -348,11 +294,6 @@ async fn test_list_schedules_filter_by_metadata(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_list_schedules_filter_by_task_name(pool: PgPool) { - if !pgcron_available(&pool).await { - eprintln!("skipping test_list_schedules_filter_by_task_name: pg_cron not available"); - return; - } - let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_filter_task") @@ -415,11 +356,6 @@ async fn test_list_schedules_filter_by_task_name(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_list_schedules_combined_filter(pool: PgPool) { - if !pgcron_available(&pool).await { - eprintln!("skipping test_list_schedules_combined_filter: pg_cron not available"); - return; - } - let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_filter_combo") From fc98566e0fb7960336e34fbbc88e00791bacf477 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Mon, 2 Mar 2026 12:46:32 -0500 Subject: [PATCH 10/13] set up pgcron in each test that needs it --- tests/cron_test.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/cron_test.rs b/tests/cron_test.rs index f4ba30d..bec266a 100644 --- a/tests/cron_test.rs +++ b/tests/cron_test.rs @@ -16,6 +16,8 @@ async fn test_setup_pgcron(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_create_and_list_schedule(pool: PgPool) { + setup_pgcron(&pool).await.unwrap(); + let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_create_list") @@ -63,6 +65,8 @@ async fn test_create_and_list_schedule(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_create_schedule_upsert(pool: PgPool) { + setup_pgcron(&pool).await.unwrap(); + let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_upsert") @@ -111,6 +115,8 @@ async fn test_create_schedule_upsert(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_delete_schedule(pool: PgPool) { + setup_pgcron(&pool).await.unwrap(); + let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_delete") @@ -171,6 +177,8 @@ async fn test_delete_nonexistent_schedule(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_create_schedule_invalid_cron(pool: PgPool) { + setup_pgcron(&pool).await.unwrap(); + let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_invalid") @@ -195,6 +203,8 @@ async fn test_create_schedule_invalid_cron(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_schedule_injects_metadata_headers(pool: PgPool) { + setup_pgcron(&pool).await.unwrap(); + let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_headers") @@ -234,6 +244,8 @@ async fn test_schedule_injects_metadata_headers(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_list_schedules_filter_by_metadata(pool: PgPool) { + setup_pgcron(&pool).await.unwrap(); + let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_filter_meta") @@ -294,6 +306,8 @@ async fn test_list_schedules_filter_by_metadata(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_list_schedules_filter_by_task_name(pool: PgPool) { + setup_pgcron(&pool).await.unwrap(); + let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_filter_task") @@ -356,6 +370,8 @@ async fn test_list_schedules_filter_by_task_name(pool: PgPool) { #[sqlx::test(migrator = "MIGRATOR")] async fn test_list_schedules_combined_filter(pool: PgPool) { + setup_pgcron(&pool).await.unwrap(); + let durable = Durable::builder() .pool(pool.clone()) .queue_name("test_cron_filter_combo") From 491f062b0792163ffe23d29bb978a5d2748732f2 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Mon, 2 Mar 2026 16:29:24 -0500 Subject: [PATCH 11/13] fixed tests --- docker-compose.yml | 5 +- tests/cron_test.rs | 209 +++++++++++++++++++++++++++++++++------------ 2 files changed, 158 insertions(+), 56 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 481defa..7122877 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,14 @@ services: postgres: + build: + context: ../../tensorzero-core/tests/e2e + dockerfile: Dockerfile.postgres image: tensorzero/postgres-dev:14-trixie-slim - command: postgres -c shared_preload_libraries=pg_cron -c cron.database_name=test environment: POSTGRES_DB: test POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres + command: ["postgres", "-c", "cron.database_name=test"] ports: - "5436:5432" healthcheck: diff --git a/tests/cron_test.rs b/tests/cron_test.rs index bec266a..87ccc1d 100644 --- a/tests/cron_test.rs +++ b/tests/cron_test.rs @@ -6,21 +6,52 @@ use durable::{ Durable, DurableError, MIGRATOR, ScheduleFilter, ScheduleOptions, SpawnOptions, setup_pgcron, }; use serde_json::json; -use sqlx::PgPool; +use sqlx::migrate::MigrateDatabase; +use sqlx::{AssertSqlSafe, PgPool, Row}; use std::collections::HashMap; -#[sqlx::test(migrator = "MIGRATOR")] -async fn test_setup_pgcron(pool: PgPool) { - setup_pgcron(&pool).await.unwrap(); +/// Connect to the real `test` database (where pg_cron lives) and run migrations. +/// `sqlx::test` creates temporary databases where pg_cron cannot be installed, +/// so cron tests must use the real database instead. +async fn setup_pool() -> PgPool { + let url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + + // Ensure the database exists + if !sqlx::Postgres::database_exists(&url).await.unwrap() { + sqlx::Postgres::create_database(&url).await.unwrap(); + } + + let pool = PgPool::connect(&url).await.expect("connect to test db"); + MIGRATOR.run(&pool).await.expect("run migrations"); + setup_pgcron(&pool).await.expect("setup pg_cron"); + pool +} + +/// Clean up a queue after a test (removes cron schedules, pg_cron jobs, and queue tables). +async fn cleanup_queue(pool: &PgPool, queue_name: &str) { + sqlx::query("SELECT durable.drop_queue($1)") + .bind(queue_name) + .execute(pool) + .await + .expect("cleanup queue"); } -#[sqlx::test(migrator = "MIGRATOR")] -async fn test_create_and_list_schedule(pool: PgPool) { +#[tokio::test] +async fn test_setup_pgcron() { + let url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + let pool = PgPool::connect(&url).await.expect("connect to test db"); + MIGRATOR.run(&pool).await.expect("run migrations"); setup_pgcron(&pool).await.unwrap(); +} + +#[tokio::test] +async fn test_create_and_list_schedule() { + let pool = setup_pool().await; + let queue = "test_cron_create_list"; let durable = Durable::builder() .pool(pool.clone()) - .queue_name("test_cron_create_list") + .queue_name(queue) .build() .await .unwrap(); @@ -59,17 +90,17 @@ async fn test_create_and_list_schedule(pool: PgPool) { "durable::test_cron_create_list::payment-schedule" ); - // Cleanup - durable.delete_schedule("payment-schedule").await.unwrap(); + cleanup_queue(&pool, queue).await; } -#[sqlx::test(migrator = "MIGRATOR")] -async fn test_create_schedule_upsert(pool: PgPool) { - setup_pgcron(&pool).await.unwrap(); +#[tokio::test] +async fn test_create_schedule_upsert() { + let pool = setup_pool().await; + let queue = "test_cron_upsert"; let durable = Durable::builder() .pool(pool.clone()) - .queue_name("test_cron_upsert") + .queue_name(queue) .build() .await .unwrap(); @@ -109,17 +140,17 @@ async fn test_create_schedule_upsert(pool: PgPool) { assert_eq!(schedules[0].cron_expression, "*/10 * * * *"); assert_eq!(schedules[0].params, json!({"version": 2})); - // Cleanup - durable.delete_schedule("my-schedule").await.unwrap(); + cleanup_queue(&pool, queue).await; } -#[sqlx::test(migrator = "MIGRATOR")] -async fn test_delete_schedule(pool: PgPool) { - setup_pgcron(&pool).await.unwrap(); +#[tokio::test] +async fn test_delete_schedule() { + let pool = setup_pool().await; + let queue = "test_cron_delete"; let durable = Durable::builder() .pool(pool.clone()) - .queue_name("test_cron_delete") + .queue_name(queue) .build() .await .unwrap(); @@ -148,13 +179,18 @@ async fn test_delete_schedule(pool: PgPool) { // Verify it's gone let schedules = durable.list_schedules(None).await.unwrap(); assert_eq!(schedules.len(), 0); + + cleanup_queue(&pool, queue).await; } -#[sqlx::test(migrator = "MIGRATOR")] -async fn test_delete_nonexistent_schedule(pool: PgPool) { +#[tokio::test] +async fn test_delete_nonexistent_schedule() { + let pool = setup_pool().await; + let queue = "test_cron_delete_missing"; + let durable = Durable::builder() .pool(pool.clone()) - .queue_name("test_cron_delete_missing") + .queue_name(queue) .build() .await .unwrap(); @@ -173,15 +209,18 @@ async fn test_delete_nonexistent_schedule(pool: PgPool) { } other => panic!("expected ScheduleNotFound, got: {other:?}"), } + + cleanup_queue(&pool, queue).await; } -#[sqlx::test(migrator = "MIGRATOR")] -async fn test_create_schedule_invalid_cron(pool: PgPool) { - setup_pgcron(&pool).await.unwrap(); +#[tokio::test] +async fn test_create_schedule_invalid_cron() { + let pool = setup_pool().await; + let queue = "test_cron_invalid"; let durable = Durable::builder() .pool(pool.clone()) - .queue_name("test_cron_invalid") + .queue_name(queue) .build() .await .unwrap(); @@ -199,15 +238,18 @@ async fn test_create_schedule_invalid_cron(pool: PgPool) { // pg_cron validates the expression and the transaction should fail let result = durable.create_schedule("bad-cron", options).await; assert!(result.is_err()); + + cleanup_queue(&pool, queue).await; } -#[sqlx::test(migrator = "MIGRATOR")] -async fn test_schedule_injects_metadata_headers(pool: PgPool) { - setup_pgcron(&pool).await.unwrap(); +#[tokio::test] +async fn test_schedule_injects_metadata_headers() { + let pool = setup_pool().await; + let queue = "test_cron_headers"; let durable = Durable::builder() .pool(pool.clone()) - .queue_name("test_cron_headers") + .queue_name(queue) .build() .await .unwrap(); @@ -238,17 +280,17 @@ async fn test_schedule_injects_metadata_headers(pool: PgPool) { ); assert_eq!(headers.get("durable::cron"), Some(&json!("0 0 * * *"))); - // Cleanup - durable.delete_schedule("header-test").await.unwrap(); + cleanup_queue(&pool, queue).await; } -#[sqlx::test(migrator = "MIGRATOR")] -async fn test_list_schedules_filter_by_metadata(pool: PgPool) { - setup_pgcron(&pool).await.unwrap(); +#[tokio::test] +async fn test_list_schedules_filter_by_metadata() { + let pool = setup_pool().await; + let queue = "test_cron_filter_meta"; let durable = Durable::builder() .pool(pool.clone()) - .queue_name("test_cron_filter_meta") + .queue_name(queue) .build() .await .unwrap(); @@ -299,18 +341,17 @@ async fn test_list_schedules_filter_by_metadata(pool: PgPool) { let all = durable.list_schedules(None).await.unwrap(); assert_eq!(all.len(), 2); - // Cleanup - durable.delete_schedule("schedule-a").await.unwrap(); - durable.delete_schedule("schedule-b").await.unwrap(); + cleanup_queue(&pool, queue).await; } -#[sqlx::test(migrator = "MIGRATOR")] -async fn test_list_schedules_filter_by_task_name(pool: PgPool) { - setup_pgcron(&pool).await.unwrap(); +#[tokio::test] +async fn test_list_schedules_filter_by_task_name() { + let pool = setup_pool().await; + let queue = "test_cron_filter_task"; let durable = Durable::builder() .pool(pool.clone()) - .queue_name("test_cron_filter_task") + .queue_name(queue) .build() .await .unwrap(); @@ -362,19 +403,17 @@ async fn test_list_schedules_filter_by_task_name(pool: PgPool) { assert_eq!(schedules.len(), 1); assert_eq!(schedules[0].name, "reports"); - // Cleanup - durable.delete_schedule("orders-1").await.unwrap(); - durable.delete_schedule("orders-2").await.unwrap(); - durable.delete_schedule("reports").await.unwrap(); + cleanup_queue(&pool, queue).await; } -#[sqlx::test(migrator = "MIGRATOR")] -async fn test_list_schedules_combined_filter(pool: PgPool) { - setup_pgcron(&pool).await.unwrap(); +#[tokio::test] +async fn test_list_schedules_combined_filter() { + let pool = setup_pool().await; + let queue = "test_cron_filter_combo"; let durable = Durable::builder() .pool(pool.clone()) - .queue_name("test_cron_filter_combo") + .queue_name(queue) .build() .await .unwrap(); @@ -434,8 +473,68 @@ async fn test_list_schedules_combined_filter(pool: PgPool) { assert_eq!(schedules.len(), 1); assert_eq!(schedules[0].name, "data-payments"); - // Cleanup - durable.delete_schedule("data-payments").await.unwrap(); - durable.delete_schedule("data-billing").await.unwrap(); - durable.delete_schedule("alerts-payments").await.unwrap(); + cleanup_queue(&pool, queue).await; +} + +#[tokio::test] +async fn test_pgcron_job_actually_spawns_task() { + let pool = setup_pool().await; + let queue = "test_cron_fires"; + + let durable = Durable::builder() + .pool(pool.clone()) + .queue_name(queue) + .build() + .await + .unwrap(); + + durable.create_queue(None).await.unwrap(); + + // Schedule a job that fires every 2 seconds + let options = ScheduleOptions { + task_name: "cron-ping".to_string(), + cron_expression: "2 seconds".to_string(), + params: json!({"source": "cron"}), + spawn_options: SpawnOptions::default(), + metadata: None, + }; + durable + .create_schedule("fast-schedule", options) + .await + .unwrap(); + + // Poll the task table until pg_cron spawns at least one task (up to 3s timeout) + let start = std::time::Instant::now(); + let timeout = std::time::Duration::from_secs(3); + let mut count: i64 = 0; + while start.elapsed() < timeout { + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + let row: (i64,) = sqlx::query_as(AssertSqlSafe(format!( + "SELECT COUNT(*) FROM durable.t_{queue}" + ))) + .fetch_one(&pool) + .await + .unwrap(); + count = row.0; + if count > 0 { + break; + } + } + + assert!(count > 0, "pg_cron should have spawned at least one task"); + + // Verify the spawned task has the right task_name and params + let row = sqlx::query(AssertSqlSafe(format!( + "SELECT task_name, params FROM durable.t_{queue} LIMIT 1" + ))) + .fetch_one(&pool) + .await + .unwrap(); + + let task_name: &str = row.get("task_name"); + let params: serde_json::Value = row.get("params"); + assert_eq!(task_name, "cron-ping"); + assert_eq!(params, json!({"source": "cron"})); + + cleanup_queue(&pool, queue).await; } From ffb26bfc9b2e56279c5fdddc1404918a54331656 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Mon, 2 Mar 2026 18:52:26 -0500 Subject: [PATCH 12/13] attempt to catch only missing cron errors as missing cron --- src/cron.rs | 69 +++++++++++++++++++++++++++++++++++++++++++++- tests/cron_test.rs | 9 ++++-- 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/src/cron.rs b/src/cron.rs index e852c99..9fccd88 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -407,7 +407,12 @@ fn build_pgcron_spawn_sql( /// Map pgcron-related SQL errors to more descriptive error messages. fn map_pgcron_error(err: sqlx::Error, operation: &str) -> DurableError { let err_str = err.to_string(); - if err_str.contains("cron") || err_str.contains("pg_cron") { + // Only classify as PgCronUnavailable when the schema or function is missing, + // not when pg_cron rejects invalid input (e.g. a bad cron expression). + if err_str.contains("schema \"cron\" does not exist") + || err_str.contains("function cron.schedule") + || err_str.contains("function cron.unschedule") + { DurableError::PgCronUnavailable { reason: format!("pg_cron error during {operation}: {err_str}"), } @@ -508,4 +513,66 @@ mod tests { assert!(sql.contains("my_task")); assert!(sql.contains("::jsonb")); } + + // --- map_pgcron_error tests --- + + #[test] + fn test_map_pgcron_error_schema_missing() { + let err = sqlx::Error::Protocol( + r#"error returned from database: schema "cron" does not exist"#.to_string(), + ); + let result = map_pgcron_error(err, "create"); + assert!( + matches!(result, DurableError::PgCronUnavailable { .. }), + "expected PgCronUnavailable, got: {result:?}" + ); + } + + #[test] + fn test_map_pgcron_error_function_schedule_missing() { + let err = sqlx::Error::Protocol( + "error returned from database: function cron.schedule(unknown, unknown, unknown) does not exist".to_string(), + ); + let result = map_pgcron_error(err, "create"); + assert!( + matches!(result, DurableError::PgCronUnavailable { .. }), + "expected PgCronUnavailable, got: {result:?}" + ); + } + + #[test] + fn test_map_pgcron_error_function_unschedule_missing() { + let err = sqlx::Error::Protocol( + "error returned from database: function cron.unschedule(bigint) does not exist" + .to_string(), + ); + let result = map_pgcron_error(err, "delete"); + assert!( + matches!(result, DurableError::PgCronUnavailable { .. }), + "expected PgCronUnavailable, got: {result:?}" + ); + } + + #[test] + fn test_map_pgcron_error_invalid_cron_expression_not_misclassified() { + // pg_cron rejects bad cron expressions with an error containing "cron" + let err = sqlx::Error::Protocol( + "error returned from database: invalid cron expression".to_string(), + ); + let result = map_pgcron_error(err, "create"); + assert!( + matches!(result, DurableError::Database(_)), + "expected Database error, got: {result:?}" + ); + } + + #[test] + fn test_map_pgcron_error_unrelated_error() { + let err = sqlx::Error::Protocol("some other database error".to_string()); + let result = map_pgcron_error(err, "create"); + assert!( + matches!(result, DurableError::Database(_)), + "expected Database error, got: {result:?}" + ); + } } diff --git a/tests/cron_test.rs b/tests/cron_test.rs index 87ccc1d..3e40840 100644 --- a/tests/cron_test.rs +++ b/tests/cron_test.rs @@ -235,9 +235,14 @@ async fn test_create_schedule_invalid_cron() { metadata: None, }; - // pg_cron validates the expression and the transaction should fail + // pg_cron validates the expression and the transaction should fail. + // Crucially, this should be a Database error, NOT PgCronUnavailable. let result = durable.create_schedule("bad-cron", options).await; - assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + !matches!(err, DurableError::PgCronUnavailable { .. }), + "invalid cron expression should not be classified as PgCronUnavailable, got: {err:?}" + ); cleanup_queue(&pool, queue).await; } From 0115d79b0991f63a9236af05c24e69061259a5a2 Mon Sep 17 00:00:00 2001 From: Viraj Mehta Date: Mon, 2 Mar 2026 18:59:49 -0500 Subject: [PATCH 13/13] simplify error handling --- src/cron.rs | 94 +++------------------------------------------- src/error.rs | 7 ---- tests/cron_test.rs | 9 +---- 3 files changed, 8 insertions(+), 102 deletions(-) diff --git a/src/cron.rs b/src/cron.rs index 9fccd88..7cdd22a 100644 --- a/src/cron.rs +++ b/src/cron.rs @@ -62,7 +62,7 @@ pub struct ScheduleFilter { /// /// # Errors /// -/// Returns [`DurableError::PgCronUnavailable`] if the extension cannot be created +/// Returns [`DurableError::InvalidConfiguration`] if the extension cannot be created /// or is not available. pub async fn setup_pgcron(pool: &PgPool) -> DurableResult<()> { // Attempt to create the extension, ignoring errors (user may not have privileges) @@ -79,7 +79,7 @@ pub async fn setup_pgcron(pool: &PgPool) -> DurableResult<()> { .await?; if !exists.0 { - return Err(DurableError::PgCronUnavailable { + return Err(DurableError::InvalidConfiguration { reason: "pg_cron extension is not installed and could not be created".to_string(), }); } @@ -127,7 +127,7 @@ where .await?; if !exists.0 { - return Err(DurableError::PgCronUnavailable { + return Err(DurableError::InvalidConfiguration { reason: "pg_cron extension is not installed".to_string(), }); } @@ -183,8 +183,7 @@ where .bind(&options.cron_expression) .bind(&spawn_sql) .execute(&mut *tx) - .await - .map_err(|e| map_pgcron_error(e, "create"))?; + .await?; // Upsert into our schedule registry sqlx::query( @@ -247,15 +246,13 @@ where sqlx::query_as("SELECT jobid FROM cron.job WHERE jobname = $1") .bind(&pgcron_job_name) .fetch_optional(&mut *tx) - .await - .map_err(|e| map_pgcron_error(e, "delete"))?; + .await?; if let Some((jobid,)) = job_row { sqlx::query("SELECT cron.unschedule($1)") .bind(jobid) .execute(&mut *tx) - .await - .map_err(|e| map_pgcron_error(e, "delete"))?; + .await?; } // Delete from our registry @@ -404,23 +401,6 @@ fn build_pgcron_spawn_sql( )) } -/// Map pgcron-related SQL errors to more descriptive error messages. -fn map_pgcron_error(err: sqlx::Error, operation: &str) -> DurableError { - let err_str = err.to_string(); - // Only classify as PgCronUnavailable when the schema or function is missing, - // not when pg_cron rejects invalid input (e.g. a bad cron expression). - if err_str.contains("schema \"cron\" does not exist") - || err_str.contains("function cron.schedule") - || err_str.contains("function cron.unschedule") - { - DurableError::PgCronUnavailable { - reason: format!("pg_cron error during {operation}: {err_str}"), - } - } else { - DurableError::Database(err) - } -} - #[cfg(test)] #[expect(clippy::unwrap_used)] mod tests { @@ -513,66 +493,4 @@ mod tests { assert!(sql.contains("my_task")); assert!(sql.contains("::jsonb")); } - - // --- map_pgcron_error tests --- - - #[test] - fn test_map_pgcron_error_schema_missing() { - let err = sqlx::Error::Protocol( - r#"error returned from database: schema "cron" does not exist"#.to_string(), - ); - let result = map_pgcron_error(err, "create"); - assert!( - matches!(result, DurableError::PgCronUnavailable { .. }), - "expected PgCronUnavailable, got: {result:?}" - ); - } - - #[test] - fn test_map_pgcron_error_function_schedule_missing() { - let err = sqlx::Error::Protocol( - "error returned from database: function cron.schedule(unknown, unknown, unknown) does not exist".to_string(), - ); - let result = map_pgcron_error(err, "create"); - assert!( - matches!(result, DurableError::PgCronUnavailable { .. }), - "expected PgCronUnavailable, got: {result:?}" - ); - } - - #[test] - fn test_map_pgcron_error_function_unschedule_missing() { - let err = sqlx::Error::Protocol( - "error returned from database: function cron.unschedule(bigint) does not exist" - .to_string(), - ); - let result = map_pgcron_error(err, "delete"); - assert!( - matches!(result, DurableError::PgCronUnavailable { .. }), - "expected PgCronUnavailable, got: {result:?}" - ); - } - - #[test] - fn test_map_pgcron_error_invalid_cron_expression_not_misclassified() { - // pg_cron rejects bad cron expressions with an error containing "cron" - let err = sqlx::Error::Protocol( - "error returned from database: invalid cron expression".to_string(), - ); - let result = map_pgcron_error(err, "create"); - assert!( - matches!(result, DurableError::Database(_)), - "expected Database error, got: {result:?}" - ); - } - - #[test] - fn test_map_pgcron_error_unrelated_error() { - let err = sqlx::Error::Protocol("some other database error".to_string()); - let result = map_pgcron_error(err, "create"); - assert!( - matches!(result, DurableError::Database(_)), - "expected Database error, got: {result:?}" - ); - } } diff --git a/src/error.rs b/src/error.rs index cd460ee..fcb5192 100644 --- a/src/error.rs +++ b/src/error.rs @@ -438,13 +438,6 @@ pub enum DurableError { state: String, }, - /// pg_cron extension is not available in the database. - #[error("pg_cron is not available: {reason}")] - PgCronUnavailable { - /// Why pg_cron is not available. - reason: String, - }, - /// Schedule name failed validation. #[error("invalid schedule name `{name}`: {reason}")] InvalidScheduleName { diff --git a/tests/cron_test.rs b/tests/cron_test.rs index 3e40840..87ccc1d 100644 --- a/tests/cron_test.rs +++ b/tests/cron_test.rs @@ -235,14 +235,9 @@ async fn test_create_schedule_invalid_cron() { metadata: None, }; - // pg_cron validates the expression and the transaction should fail. - // Crucially, this should be a Database error, NOT PgCronUnavailable. + // pg_cron validates the expression and the transaction should fail let result = durable.create_schedule("bad-cron", options).await; - let err = result.unwrap_err(); - assert!( - !matches!(err, DurableError::PgCronUnavailable { .. }), - "invalid cron expression should not be classified as PgCronUnavailable, got: {err:?}" - ); + assert!(result.is_err()); cleanup_queue(&pool, queue).await; }