From 8d7c709fafda84f9e27548d32699bd5a6886b3f4 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Mon, 16 Mar 2026 17:27:49 -0500 Subject: [PATCH 01/11] feat(amp-worker-gc): add standalone GC job type for control-plane scheduling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract garbage collection from the worker's compaction task into a standalone job type managed by the controller via the job ledger. This decouples GC from compaction so it can run independently regardless of whether materialization jobs are active. New crate: amp-worker-gc with job descriptor, idempotency key, and collection algorithm. Controller schedules GC jobs per active physical table revision on a 60s interval. Workers execute them using the same stream-expired → delete- metadata → delete-files algorithm as the existing Collector. --- Cargo.lock | 19 ++ Cargo.toml | 1 + crates/bin/ampctl/src/cmd/job/list.rs | 1 + crates/core/worker-gc/Cargo.toml | 17 ++ crates/core/worker-gc/src/error.rs | 77 ++++++ crates/core/worker-gc/src/job_ctx.rs | 12 + crates/core/worker-gc/src/job_descriptor.rs | 56 +++++ crates/core/worker-gc/src/job_impl.rs | 129 ++++++++++ crates/core/worker-gc/src/job_key.rs | 21 ++ crates/core/worker-gc/src/job_kind.rs | 50 ++++ crates/core/worker-gc/src/lib.rs | 6 + crates/services/admin-api/Cargo.toml | 1 + crates/services/admin-api/src/scheduler.rs | 7 + crates/services/controller/Cargo.toml | 1 + crates/services/controller/src/scheduler.rs | 78 ++++++ crates/services/controller/src/service.rs | 35 +++ crates/services/worker/Cargo.toml | 1 + crates/services/worker/src/job.rs | 1 + .../services/worker/src/service/job_impl.rs | 236 ++++++++++-------- 19 files changed, 641 insertions(+), 108 deletions(-) create mode 100644 crates/core/worker-gc/Cargo.toml create mode 100644 crates/core/worker-gc/src/error.rs create mode 100644 crates/core/worker-gc/src/job_ctx.rs create mode 100644 crates/core/worker-gc/src/job_descriptor.rs create mode 100644 crates/core/worker-gc/src/job_impl.rs create mode 100644 crates/core/worker-gc/src/job_key.rs create mode 100644 crates/core/worker-gc/src/job_kind.rs create mode 100644 crates/core/worker-gc/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 11ef46e25..79923c664 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,6 +36,7 @@ dependencies = [ "amp-worker-core", "amp-worker-datasets-derived", "amp-worker-datasets-raw", + "amp-worker-gc", "async-trait", "axum", "common", @@ -1320,6 +1321,22 @@ dependencies = [ "verification", ] +[[package]] +name = "amp-worker-gc" +version = "0.1.0" +dependencies = [ + "amp-data-store", + "amp-worker-core", + "datasets-common", + "futures", + "metadata-db", + "object_store", + "serde", + "serde_json", + "thiserror 2.0.18", + "tracing", +] + [[package]] name = "ampcc" version = "0.1.0" @@ -3253,6 +3270,7 @@ dependencies = [ "amp-datasets-registry", "amp-providers-registry", "amp-worker-core", + "amp-worker-gc", "async-trait", "axum", "common", @@ -14307,6 +14325,7 @@ dependencies = [ "amp-worker-core", "amp-worker-datasets-derived", "amp-worker-datasets-raw", + "amp-worker-gc", "async-trait", "backon", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 6599b53ca..2b9c31ac3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ "crates/core/worker-core", "crates/core/worker-datasets-derived", "crates/core/worker-datasets-raw", + "crates/core/worker-gc", "crates/parquet-ext", "crates/extractors/evm-rpc", "crates/extractors/evm-rpc/gen", diff --git a/crates/bin/ampctl/src/cmd/job/list.rs b/crates/bin/ampctl/src/cmd/job/list.rs index 84972e5fd..d1119788a 100644 --- a/crates/bin/ampctl/src/cmd/job/list.rs +++ b/crates/bin/ampctl/src/cmd/job/list.rs @@ -133,6 +133,7 @@ fn show_dataset_descriptor(descriptor: &serde_json::Value) -> Option { dataset_name = desc.dataset_name, hash = &desc.manifest_hash.as_str()[..7], )), + JobDescriptor::Gc(desc) => Some(format!("gc location:{}", desc.location_id)), } } diff --git a/crates/core/worker-gc/Cargo.toml b/crates/core/worker-gc/Cargo.toml new file mode 100644 index 000000000..a4065a536 --- /dev/null +++ b/crates/core/worker-gc/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "amp-worker-gc" +edition.workspace = true +version.workspace = true +license-file.workspace = true + +[dependencies] +amp-data-store = { path = "../data-store" } +amp-worker-core = { path = "../worker-core" } +datasets-common = { path = "../datasets-common" } +futures.workspace = true +metadata-db = { path = "../metadata-db" } +object_store.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tracing.workspace = true diff --git a/crates/core/worker-gc/src/error.rs b/crates/core/worker-gc/src/error.rs new file mode 100644 index 000000000..efb55cd31 --- /dev/null +++ b/crates/core/worker-gc/src/error.rs @@ -0,0 +1,77 @@ +//! Error types for GC job execution. + +use amp_worker_core::{error_detail::ErrorDetailsProvider, retryable::RetryableErrorExt}; + +/// Errors that can occur during garbage collection job execution. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// The target physical table revision does not exist. + /// + /// This occurs when the `location_id` in the job descriptor does not match any + /// revision in the metadata database. The revision may have been deleted between + /// scheduling and execution. + /// + /// This is a fatal (non-retryable) error since the location will not reappear. + #[error("location not found: {0}")] + LocationNotFound(metadata_db::physical_table_revision::LocationId), + + /// Failed to query the metadata database for the physical table revision. + /// + /// This occurs when looking up the revision by `location_id` at the start of + /// the GC job. Common causes include database connectivity issues or timeouts. + #[error("metadata database error")] + MetadataDb(#[source] metadata_db::Error), + + /// Failed to stream expired files from the GC manifest. + /// + /// This occurs during step 1 of the collection algorithm when querying + /// `gc_manifest` for files whose expiration has passed. Common causes include + /// database connectivity issues or query timeouts. + #[error("failed to stream expired files")] + FileStream(#[source] metadata_db::Error), + + /// Failed to delete file metadata records from Postgres. + /// + /// This occurs during step 2 of the collection algorithm when removing + /// `file_metadata` rows for expired files. Common causes include database + /// connectivity issues or transaction conflicts. + #[error("failed to delete file metadata")] + FileMetadataDelete(#[source] metadata_db::Error), + + /// Failed to delete a physical file from object storage. + /// + /// This occurs during step 3 of the collection algorithm when removing + /// Parquet files from S3/GCS/local storage. Common causes include network + /// failures, permission errors, or storage service unavailability. + /// + /// Note: `NotFound` errors are tolerated (the file is already gone). + /// Only other object store errors trigger this variant. + #[error("object store error")] + ObjectStore(#[source] object_store::Error), +} + +impl RetryableErrorExt for Error { + fn is_retryable(&self) -> bool { + match self { + Self::LocationNotFound(_) => false, + Self::MetadataDb(_) => true, + Self::FileStream(_) => true, + Self::FileMetadataDelete(_) => true, + Self::ObjectStore(_) => true, + } + } +} + +impl amp_worker_core::retryable::JobErrorExt for Error { + fn error_code(&self) -> &'static str { + match self { + Self::LocationNotFound(_) => "GC_LOCATION_NOT_FOUND", + Self::MetadataDb(_) => "GC_METADATA_DB", + Self::FileStream(_) => "GC_FILE_STREAM", + Self::FileMetadataDelete(_) => "GC_FILE_METADATA_DELETE", + Self::ObjectStore(_) => "GC_OBJECT_STORE", + } + } +} + +impl ErrorDetailsProvider for Error {} diff --git a/crates/core/worker-gc/src/job_ctx.rs b/crates/core/worker-gc/src/job_ctx.rs new file mode 100644 index 000000000..cf10581c1 --- /dev/null +++ b/crates/core/worker-gc/src/job_ctx.rs @@ -0,0 +1,12 @@ +//! Context for GC job execution. + +use amp_data_store::DataStore; +use metadata_db::MetadataDb; + +/// Dependencies required to execute a GC job. +pub struct Context { + /// Connection to the metadata database (Postgres). + pub metadata_db: MetadataDb, + /// Connection to object storage (S3/GCS/local FS). + pub data_store: DataStore, +} diff --git a/crates/core/worker-gc/src/job_descriptor.rs b/crates/core/worker-gc/src/job_descriptor.rs new file mode 100644 index 000000000..32b0352a4 --- /dev/null +++ b/crates/core/worker-gc/src/job_descriptor.rs @@ -0,0 +1,56 @@ +use metadata_db::physical_table_revision::LocationId; + +use crate::job_kind::GcJobKind; + +/// Job descriptor for garbage collection. +/// +/// Contains the fields needed to execute a GC job for a single physical table revision. +#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct JobDescriptor { + /// The physical table revision to collect garbage for. + pub location_id: LocationId, +} + +impl From for metadata_db::job_events::EventDetailOwned { + fn from(desc: JobDescriptor) -> Self { + #[derive(serde::Serialize)] + struct Tagged<'a> { + kind: GcJobKind, + #[serde(flatten)] + inner: &'a JobDescriptor, + } + + // SAFETY: `to_raw_value` only fails on non-string map keys which cannot occur + // with a flat struct containing only a primitive integer field. + let raw = serde_json::value::to_raw_value(&Tagged { + kind: GcJobKind, + inner: &desc, + }) + .expect("JobDescriptor serialization is infallible"); + + metadata_db::job_events::EventDetail::from_owned_unchecked(raw) + } +} + +impl TryFrom<&metadata_db::job_events::EventDetail<'_>> for JobDescriptor { + type Error = InvalidJobDescriptorError; + + fn try_from(raw: &metadata_db::job_events::EventDetail<'_>) -> Result { + #[derive(serde::Deserialize)] + struct TaggedOwned { + #[allow(dead_code)] + kind: GcJobKind, + #[serde(flatten)] + inner: JobDescriptor, + } + + let tagged: TaggedOwned = + serde_json::from_str(raw.as_str()).map_err(InvalidJobDescriptorError)?; + Ok(tagged.inner) + } +} + +/// Error returned when an [`EventDetail`] cannot be converted into a [`JobDescriptor`]. +#[derive(Debug, thiserror::Error)] +#[error("invalid job descriptor")] +pub struct InvalidJobDescriptorError(#[source] pub serde_json::Error); diff --git a/crates/core/worker-gc/src/job_impl.rs b/crates/core/worker-gc/src/job_impl.rs new file mode 100644 index 000000000..30251f354 --- /dev/null +++ b/crates/core/worker-gc/src/job_impl.rs @@ -0,0 +1,129 @@ +//! GC job execution. +//! +//! Implements the collection algorithm: stream expired files from the GC manifest, +//! delete their metadata from Postgres, then delete the physical files from object storage. + +use std::collections::{BTreeMap, BTreeSet}; + +use amp_data_store::{DataStore, DeleteFilesStreamError, physical_table::PhyTableRevisionPath}; +use futures::{StreamExt as _, TryStreamExt as _, stream}; +use metadata_db::{ + MetadataDb, files::FileId, gc::GcManifestRow, physical_table_revision::LocationId, +}; +use object_store::{Error as ObjectStoreError, path::Path}; + +use crate::{error::Error, job_ctx::Context, job_descriptor::JobDescriptor}; + +/// Execute a garbage collection job for a single physical table revision. +/// +/// The algorithm: +/// 1. Look up the revision by `location_id` to get its storage path. +/// 2. Stream expired files from the `gc_manifest` table. +/// 3. Delete file metadata rows from Postgres (cascades to `gc_manifest` and `footer_cache`). +/// 4. Delete physical files from object storage. +/// +/// Metadata is deleted before physical files. If the process crashes between steps 3 and 4, +/// orphaned files may remain in storage but no dangling metadata references will exist. +#[tracing::instrument(skip_all, err, fields(location_id = %desc.location_id))] +pub async fn execute(ctx: Context, desc: JobDescriptor) -> Result<(), Error> { + let location_id = desc.location_id; + + // Look up the revision to get its storage path + let revision = + metadata_db::physical_table_revision::get_by_location_id(&ctx.metadata_db, location_id) + .await + .map_err(Error::MetadataDb)? + .ok_or(Error::LocationNotFound(location_id))?; + + let revision_path: PhyTableRevisionPath = revision.path.into(); + + collect( + &ctx.metadata_db, + &ctx.data_store, + location_id, + &revision_path, + ) + .await +} + +/// Run the collection algorithm for a single location. +async fn collect( + metadata_db: &MetadataDb, + data_store: &DataStore, + location_id: LocationId, + revision_path: &PhyTableRevisionPath, +) -> Result<(), Error> { + // Step 1: Stream expired files from the GC manifest + let found_file_ids_to_paths: BTreeMap = + metadata_db::gc::stream_expired(metadata_db, location_id) + .map_err(Error::FileStream) + .map(|manifest_row| { + let GcManifestRow { + file_id, + file_path: file_name, + .. + } = manifest_row?; + + let path = revision_path.child(file_name.as_str()); + Ok::<_, Error>((file_id, path)) + }) + .try_collect() + .await?; + + tracing::debug!( + expired_files = found_file_ids_to_paths.len(), + "expired files found" + ); + + if found_file_ids_to_paths.is_empty() { + return Ok(()); + } + + // Step 2: Delete file metadata from Postgres + let file_ids_to_delete: Vec = found_file_ids_to_paths.keys().copied().collect(); + let paths_to_remove: BTreeSet = + metadata_db::files::delete_by_ids(metadata_db, &file_ids_to_delete) + .await + .map_err(Error::FileMetadataDelete)? + .into_iter() + .filter_map(|file_id| found_file_ids_to_paths.get(&file_id).cloned()) + .collect(); + + tracing::debug!( + metadata_entries_deleted = paths_to_remove.len(), + "metadata entries deleted" + ); + + // Step 3: Delete physical files from object storage + let mut delete_stream = + data_store.delete_files_stream(stream::iter(paths_to_remove).map(Ok).boxed()); + + let mut files_deleted: u64 = 0; + let mut files_not_found: u64 = 0; + + while let Some(result) = delete_stream.next().await { + match result { + Ok(path) => { + tracing::debug!(%path, "deleted expired file"); + files_deleted += 1; + } + Err(DeleteFilesStreamError(ObjectStoreError::NotFound { path, .. })) => { + tracing::debug!(%path, "expired file not found"); + files_not_found += 1; + } + Err(DeleteFilesStreamError(err)) => { + tracing::warn!( + error = %err, + files_deleted, + files_not_found, + "collection aborted due to object store error" + ); + return Err(Error::ObjectStore(err)); + } + } + } + + tracing::info!(files_deleted, files_not_found, "collection complete"); + + Ok(()) +} diff --git a/crates/core/worker-gc/src/job_key.rs b/crates/core/worker-gc/src/job_key.rs new file mode 100644 index 000000000..059e87b0b --- /dev/null +++ b/crates/core/worker-gc/src/job_key.rs @@ -0,0 +1,21 @@ +//! Idempotency key computation for garbage collection jobs. +//! +//! Computes idempotency keys by combining the GC job kind discriminator +//! with a location ID, producing a deterministic hash that prevents duplicate +//! job scheduling for the same physical table revision. + +use datasets_common::hash::Hash; +use metadata_db::{jobs::IdempotencyKey, physical_table_revision::LocationId}; + +use crate::job_kind::JOB_KIND; + +/// Compute an idempotency key for a GC job. +/// +/// The key is derived by hashing `{job_kind}:{location_id}`, +/// producing a deterministic 64-character hex string. +pub fn idempotency_key(location_id: LocationId) -> IdempotencyKey<'static> { + let input = format!("{JOB_KIND}:{location_id}"); + let hash: Hash = datasets_common::hash::hash(input); + // SAFETY: The hash is a validated 64-char hex string produced by our hash function. + IdempotencyKey::from_owned_unchecked(hash.into_inner()) +} diff --git a/crates/core/worker-gc/src/job_kind.rs b/crates/core/worker-gc/src/job_kind.rs new file mode 100644 index 000000000..1df5d8752 --- /dev/null +++ b/crates/core/worker-gc/src/job_kind.rs @@ -0,0 +1,50 @@ +/// The `kind` discriminator value for garbage collection jobs. +pub const JOB_KIND: &str = "gc"; + +/// Type-safe representation of the garbage collection job kind. +/// +/// Serializes as the string [`JOB_KIND`] and only deserializes successfully +/// when the input matches that exact value. +#[derive(Debug)] +pub struct GcJobKind; + +impl std::fmt::Display for GcJobKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(JOB_KIND) + } +} + +impl std::str::FromStr for GcJobKind { + type Err = JobKindMismatchError; + + fn from_str(s: &str) -> Result { + if s != JOB_KIND { + return Err(JobKindMismatchError(s.to_owned())); + } + Ok(GcJobKind) + } +} + +impl serde::Serialize for GcJobKind { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(JOB_KIND) + } +} + +impl<'de> serde::Deserialize<'de> for GcJobKind { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + s.parse().map_err(serde::de::Error::custom) + } +} + +/// Error returned when a string does not match the expected [`JOB_KIND`]. +#[derive(Debug, thiserror::Error)] +#[error("job kind mismatch: expected `{JOB_KIND}`, got `{0}`")] +pub struct JobKindMismatchError(String); diff --git a/crates/core/worker-gc/src/lib.rs b/crates/core/worker-gc/src/lib.rs new file mode 100644 index 000000000..8b4451961 --- /dev/null +++ b/crates/core/worker-gc/src/lib.rs @@ -0,0 +1,6 @@ +pub mod error; +pub mod job_ctx; +pub mod job_descriptor; +pub mod job_impl; +pub mod job_key; +pub mod job_kind; diff --git a/crates/services/admin-api/Cargo.toml b/crates/services/admin-api/Cargo.toml index dd87f638e..3219f9953 100644 --- a/crates/services/admin-api/Cargo.toml +++ b/crates/services/admin-api/Cargo.toml @@ -14,6 +14,7 @@ amp-providers-registry = { path = "../../core/providers-registry" } amp-worker-core = { path = "../../core/worker-core" } amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" } amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" } +amp-worker-gc = { path = "../../core/worker-gc" } async-trait.workspace = true axum.workspace = true common = { path = "../../core/common" } diff --git a/crates/services/admin-api/src/scheduler.rs b/crates/services/admin-api/src/scheduler.rs index c093d732d..2861e6e7e 100644 --- a/crates/services/admin-api/src/scheduler.rs +++ b/crates/services/admin-api/src/scheduler.rs @@ -217,6 +217,13 @@ impl From for JobDes } } +impl From for JobDescriptor { + fn from(desc: amp_worker_gc::job_descriptor::JobDescriptor) -> Self { + let raw: EventDetailOwned = desc.into(); + Self(raw.into_inner()) + } +} + /// Errors that can occur when scheduling a dataset dump job #[derive(Debug, thiserror::Error)] pub enum ScheduleJobError { diff --git a/crates/services/controller/Cargo.toml b/crates/services/controller/Cargo.toml index 5326f91c8..e32a5f286 100644 --- a/crates/services/controller/Cargo.toml +++ b/crates/services/controller/Cargo.toml @@ -10,6 +10,7 @@ amp-data-store = { path = "../../core/data-store" } amp-datasets-registry = { version = "0.1.0", path = "../../core/datasets-registry" } amp-providers-registry = { version = "0.1.0", path = "../../core/providers-registry" } amp-worker-core = { path = "../../core/worker-core" } +amp-worker-gc = { path = "../../core/worker-gc" } async-trait.workspace = true axum.workspace = true common = { path = "../../core/common" } diff --git a/crates/services/controller/src/scheduler.rs b/crates/services/controller/src/scheduler.rs index 84ebb8cbc..526e4596b 100644 --- a/crates/services/controller/src/scheduler.rs +++ b/crates/services/controller/src/scheduler.rs @@ -356,6 +356,84 @@ impl Scheduler { Ok(()) } + + /// Schedule GC jobs for all active physical table revisions. + /// + /// Paginates through all active revisions and schedules a GC job for each one. + /// The idempotency key (`gc:{location_id}`) prevents duplicate jobs for the same + /// location. If a GC job is already scheduled or running for a location, the existing + /// job ID is returned without creating a new one. + pub async fn schedule_gc_jobs(&self) -> Result<(), ScheduleGcJobsError> { + let page_size: i64 = 100; + let mut last_id: Option = None; + + loop { + let revisions = metadata_db::physical_table_revision::list( + &self.metadata_db, + page_size, + last_id, + Some(true), + ) + .await + .map_err(ScheduleGcJobsError::ListRevisions)?; + + if revisions.is_empty() { + break; + } + + for revision in &revisions { + let idempotency_key = amp_worker_gc::job_key::idempotency_key(revision.id); + let descriptor = admin_api::scheduler::JobDescriptor::from( + amp_worker_gc::job_descriptor::JobDescriptor { + location_id: revision.id, + }, + ); + + match self + .schedule_job_impl(idempotency_key, descriptor, None) + .await + { + Ok(job_id) => { + tracing::debug!( + location_id = %revision.id, + %job_id, + "scheduled GC job" + ); + } + Err(ScheduleJobError::NoWorkersAvailable) => { + tracing::warn!("no workers available for GC scheduling"); + return Ok(()); + } + Err(ScheduleJobError::ActiveJobConflict { .. }) => { + // GC job already running for this location, skip + } + Err(err) => { + tracing::error!( + location_id = %revision.id, + error = %err, + error_source = logging::error_source(&err), + "failed to schedule GC job" + ); + } + } + } + + last_id = revisions.last().map(|r| r.id); + if revisions.len() < page_size as usize { + break; + } + } + + Ok(()) + } +} + +/// Errors that occur when scheduling GC jobs [`Scheduler::schedule_gc_jobs`] +#[derive(Debug, thiserror::Error)] +pub enum ScheduleGcJobsError { + /// Failed to list active physical table revisions + #[error("failed to list active revisions")] + ListRevisions(#[source] metadata_db::Error), } /// Errors that occur during failed job reconciliation [`Scheduler::reconcile_failed_jobs`] diff --git a/crates/services/controller/src/service.rs b/crates/services/controller/src/service.rs index 276d7e4c7..73aceda6b 100644 --- a/crates/services/controller/src/service.rs +++ b/crates/services/controller/src/service.rs @@ -24,6 +24,12 @@ use crate::{build_info::BuildInfo, scheduler::Scheduler}; /// The scheduler checks for failed jobs ready for retry at this interval. const RECONCILIATION_INTERVAL: Duration = Duration::from_secs(60); +/// Interval between GC scheduling sweeps. +/// +/// The controller scans for active physical table revisions and schedules +/// GC jobs at this interval. +const GC_SCHEDULING_INTERVAL: Duration = Duration::from_secs(60); + /// Create and initialize the controller service /// /// Sets up the admin API server with the scheduler, dataset store, and metadata database. @@ -94,6 +100,24 @@ pub async fn new( } }); + // Spawn background task for GC job scheduling + let scheduler_for_gc = scheduler.clone(); + let gc_scheduling_task = tokio::spawn(async move { + let mut interval = tokio::time::interval(GC_SCHEDULING_INTERVAL); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + interval.tick().await; + if let Err(err) = scheduler_for_gc.schedule_gc_jobs().await { + tracing::error!( + error = %err, + error_source = monitoring::logging::error_source(&err), + "GC job scheduling failed" + ); + } + } + }); + // The service future that runs the server with graceful shutdown let server = async move { let server_future = axum::serve(listener, router).with_graceful_shutdown(shutdown_signal()); @@ -105,6 +129,9 @@ pub async fn new( _ = reconciliation_task => { Err(ServerError::ReconciliationTerminated) } + _ = gc_scheduling_task => { + Err(ServerError::GcSchedulingTerminated) + } } }; Ok((addr, server)) @@ -164,6 +191,14 @@ pub enum ServerError { /// - Task is cancelled externally #[error("Reconciliation task terminated unexpectedly")] ReconciliationTerminated, + + /// Background GC scheduling task terminated unexpectedly + /// + /// This occurs when: + /// - GC scheduling task panics + /// - Task is cancelled externally + #[error("GC scheduling task terminated unexpectedly")] + GcSchedulingTerminated, } /// Returns a future that completes when a shutdown signal is received. diff --git a/crates/services/worker/Cargo.toml b/crates/services/worker/Cargo.toml index 2bc0ea6ab..9d687b743 100644 --- a/crates/services/worker/Cargo.toml +++ b/crates/services/worker/Cargo.toml @@ -10,6 +10,7 @@ amp-data-store = { path = "../../core/data-store" } amp-worker-core = { path = "../../core/worker-core" } amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" } amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" } +amp-worker-gc = { path = "../../core/worker-gc" } async-trait.workspace = true backon.workspace = true chrono.workspace = true diff --git a/crates/services/worker/src/job.rs b/crates/services/worker/src/job.rs index a9988f48b..6efebf397 100644 --- a/crates/services/worker/src/job.rs +++ b/crates/services/worker/src/job.rs @@ -56,4 +56,5 @@ impl From for metadata_db::jobs::Job { pub enum JobDescriptor { MaterializeRaw(amp_worker_datasets_raw::job_descriptor::JobDescriptor), MaterializeDerived(amp_worker_datasets_derived::job_descriptor::JobDescriptor), + Gc(amp_worker_gc::job_descriptor::JobDescriptor), } diff --git a/crates/services/worker/src/service/job_impl.rs b/crates/services/worker/src/service/job_impl.rs index 68ddb364a..f0720f179 100644 --- a/crates/services/worker/src/service/job_impl.rs +++ b/crates/services/worker/src/service/job_impl.rs @@ -1,6 +1,6 @@ //! Internal job implementation for the worker service. -use std::{future::Future, sync::Arc}; +use std::sync::Arc; use amp_worker_core::{ ProgressReporter, @@ -16,141 +16,158 @@ use crate::{ events::WorkerProgressReporter, job::JobDescriptor, kafka::proto, service::WorkerJobCtx, }; -/// Create and run a worker job that materializes tables from a dataset. +/// Create and run a worker job. /// -/// This function returns a future that executes the materialization operation. -/// Raw datasets are handled by `amp_worker_datasets_raw` and derived -/// datasets are handled by `amp_worker_datasets_derived`. -pub(super) fn new( +/// This function returns a future that executes the job operation. +/// Raw datasets are handled by `amp_worker_datasets_raw`, derived +/// datasets by `amp_worker_datasets_derived`, and garbage collection +/// by `amp_worker_gc`. +pub(super) async fn new( job_ctx: WorkerJobCtx, job_desc: JobDescriptor, job_id: JobId, -) -> impl Future> { - let reference = match &job_desc { - JobDescriptor::MaterializeRaw(desc) => HashReference::new( - desc.dataset_namespace.clone(), - desc.dataset_name.clone(), - desc.manifest_hash.clone(), - ), - JobDescriptor::MaterializeDerived(desc) => HashReference::new( - desc.dataset_namespace.clone(), - desc.dataset_name.clone(), - desc.manifest_hash.clone(), - ), - }; - - let metrics = job_ctx - .meter - .as_ref() - .map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id))); - - // Create progress reporter for event streaming - // Always create the reporter - NoOpEmitter will discard events if not needed - let progress_reporter: Option> = { - let dataset_info = proto::DatasetInfo { - namespace: reference.namespace().to_string(), - name: reference.name().to_string(), - manifest_hash: reference.hash().to_string(), - }; - Some(Arc::new(WorkerProgressReporter::new( - job_id, - dataset_info, - job_ctx.event_emitter.clone(), - ))) - }; - - let writer: metadata_db::jobs::JobId = job_id.into(); - async move { - match job_desc { - JobDescriptor::MaterializeRaw(desc) => { - let ctx = amp_worker_datasets_raw::job_ctx::Context { - job_id: Some(writer), - config: amp_worker_datasets_raw::job_ctx::Config { - poll_interval: job_ctx.config.poll_interval, - progress_interval: job_ctx - .config - .events_config - .progress_interval - .clone() - .into(), - parquet_writer: (&job_ctx.config.parquet).into(), - }, - metadata_db: job_ctx.metadata_db.clone(), - datasets_cache: job_ctx.datasets_cache.clone(), - ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), - data_store: job_ctx.data_store.clone(), - notification_multiplexer: job_ctx.notification_multiplexer.clone(), - metrics, - progress_reporter, +) -> Result<(), JobError> { + match job_desc { + JobDescriptor::Gc(desc) => { + let ctx = amp_worker_gc::job_ctx::Context { + metadata_db: job_ctx.metadata_db.clone(), + data_store: job_ctx.data_store.clone(), + }; + + amp_worker_gc::job_impl::execute(ctx, desc) + .instrument(info_span!("gc_job", %job_id)) + .await + .map_err(JobError::Gc)?; + } + JobDescriptor::MaterializeRaw(desc) => { + let reference = HashReference::new( + desc.dataset_namespace.clone(), + desc.dataset_name.clone(), + desc.manifest_hash.clone(), + ); + let metrics = job_ctx + .meter + .as_ref() + .map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id))); + let progress_reporter: Option> = { + let dataset_info = proto::DatasetInfo { + namespace: reference.namespace().to_string(), + name: reference.name().to_string(), + manifest_hash: reference.hash().to_string(), }; + Some(Arc::new(WorkerProgressReporter::new( + job_id, + dataset_info, + job_ctx.event_emitter.clone(), + ))) + }; + let writer: metadata_db::jobs::JobId = job_id.into(); - amp_worker_datasets_raw::job_impl::execute(ctx, desc, writer) + let ctx = amp_worker_datasets_raw::job_ctx::Context { + job_id: Some(writer), + config: amp_worker_datasets_raw::job_ctx::Config { + poll_interval: job_ctx.config.poll_interval, + progress_interval: job_ctx + .config + .events_config + .progress_interval + .clone() + .into(), + parquet_writer: (&job_ctx.config.parquet).into(), + }, + metadata_db: job_ctx.metadata_db.clone(), + datasets_cache: job_ctx.datasets_cache.clone(), + ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), + data_store: job_ctx.data_store.clone(), + notification_multiplexer: job_ctx.notification_multiplexer.clone(), + metrics, + progress_reporter, + }; + + amp_worker_datasets_raw::job_impl::execute(ctx, desc, writer) .instrument( info_span!("materialize_raw_job", %job_id, dataset = %format!("{reference:#}")), ) .await .map_err(JobError::MaterializeRaw)?; - } - JobDescriptor::MaterializeDerived(desc) => { - let ctx = amp_worker_datasets_derived::job_ctx::Context { - job_id: Some(writer), - config: amp_worker_datasets_derived::job_ctx::Config { - keep_alive_interval: job_ctx.config.keep_alive_interval, - max_mem_mb: job_ctx.config.max_mem_mb, - query_max_mem_mb: job_ctx.config.query_max_mem_mb, - spill_location: job_ctx.config.spill_location.clone(), - progress_interval: job_ctx - .config - .events_config - .progress_interval - .clone() - .into(), - parquet_writer: (&job_ctx.config.parquet).into(), - microbatch_max_interval: job_ctx.config.microbatch_max_interval, - }, - metadata_db: job_ctx.metadata_db.clone(), - datasets_cache: job_ctx.datasets_cache.clone(), - ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), - data_store: job_ctx.data_store.clone(), - isolate_pool: job_ctx.isolate_pool.clone(), - notification_multiplexer: job_ctx.notification_multiplexer.clone(), - metrics, - progress_reporter, + } + JobDescriptor::MaterializeDerived(desc) => { + let reference = HashReference::new( + desc.dataset_namespace.clone(), + desc.dataset_name.clone(), + desc.manifest_hash.clone(), + ); + let metrics = job_ctx + .meter + .as_ref() + .map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id))); + let progress_reporter: Option> = { + let dataset_info = proto::DatasetInfo { + namespace: reference.namespace().to_string(), + name: reference.name().to_string(), + manifest_hash: reference.hash().to_string(), }; + Some(Arc::new(WorkerProgressReporter::new( + job_id, + dataset_info, + job_ctx.event_emitter.clone(), + ))) + }; + let writer: metadata_db::jobs::JobId = job_id.into(); - amp_worker_datasets_derived::job_impl::execute(ctx, desc, writer) + let ctx = amp_worker_datasets_derived::job_ctx::Context { + job_id: Some(writer), + config: amp_worker_datasets_derived::job_ctx::Config { + keep_alive_interval: job_ctx.config.keep_alive_interval, + max_mem_mb: job_ctx.config.max_mem_mb, + query_max_mem_mb: job_ctx.config.query_max_mem_mb, + spill_location: job_ctx.config.spill_location.clone(), + progress_interval: job_ctx + .config + .events_config + .progress_interval + .clone() + .into(), + parquet_writer: (&job_ctx.config.parquet).into(), + microbatch_max_interval: job_ctx.config.microbatch_max_interval, + }, + metadata_db: job_ctx.metadata_db.clone(), + datasets_cache: job_ctx.datasets_cache.clone(), + ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), + data_store: job_ctx.data_store.clone(), + isolate_pool: job_ctx.isolate_pool.clone(), + notification_multiplexer: job_ctx.notification_multiplexer.clone(), + metrics, + progress_reporter, + }; + + amp_worker_datasets_derived::job_impl::execute(ctx, desc, writer) .instrument(info_span!("materialize_derived_job", %job_id, dataset = %format!("{reference:#}"))) .await .map_err(JobError::MaterializeDerived)?; - } } - - Ok(()) } + + Ok(()) } -/// Errors from dataset materialization job execution. +/// Errors from worker job execution. /// -/// Wraps the specific error types from raw and derived dataset materialization operations -/// to provide a unified error type for the worker job system. +/// Wraps the specific error types from each job type to provide a unified +/// error type for the worker job system. #[derive(Debug, thiserror::Error)] pub(crate) enum JobError { - /// Raw dataset materialization operation failed - /// - /// This occurs when the raw dataset extraction and Parquet file writing - /// process encounters an error. Common causes include blockchain client - /// connectivity issues, consistency check failures, and partition task errors. + /// Raw dataset materialization operation failed. #[error("Failed to materialize raw dataset")] MaterializeRaw(#[source] amp_worker_datasets_raw::job_impl::Error), - /// Derived dataset materialization operation failed - /// - /// This occurs when the derived dataset SQL query execution and Parquet - /// file writing process encounters an error. Common causes include query - /// environment creation failures, manifest retrieval errors, and table - /// materialization failures. + /// Derived dataset materialization operation failed. #[error("Failed to materialize derived dataset")] MaterializeDerived(#[source] amp_worker_datasets_derived::job_impl::Error), + + /// Garbage collection job failed. + #[error("Failed to run garbage collection")] + Gc(#[source] amp_worker_gc::error::Error), } impl JobErrorExt for JobError { @@ -158,6 +175,7 @@ impl JobErrorExt for JobError { match self { Self::MaterializeRaw(err) => err.error_code(), Self::MaterializeDerived(err) => err.error_code(), + Self::Gc(err) => err.error_code(), } } } @@ -167,6 +185,7 @@ impl ErrorDetailsProvider for JobError { match self { Self::MaterializeRaw(err) => Some(err), Self::MaterializeDerived(err) => Some(err), + Self::Gc(err) => Some(err), } } } @@ -176,6 +195,7 @@ impl RetryableErrorExt for JobError { match self { Self::MaterializeRaw(err) => err.is_retryable(), Self::MaterializeDerived(err) => err.is_retryable(), + Self::Gc(err) => err.is_retryable(), } } } From 91ec827d2e8ee5e303ef08b9127b65b689d880d5 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Tue, 17 Mar 2026 17:42:28 -0500 Subject: [PATCH 02/11] feat(amp-config): add GC scheduling config toggle for controller Add GcSchedulingConfig with `enabled` (default false) and `interval` (default 60s) fields so GC scheduling can be toggled without code changes. The controller only spawns the GC scheduling task when enabled, preventing unintended GC job creation on deployment. --- Cargo.lock | 1 + crates/bin/ampd/src/controller_cmd.rs | 1 + crates/bin/ampd/src/solo_cmd.rs | 1 + crates/config/src/config_file.rs | 4 ++ crates/config/src/controller.rs | 36 +++++++++++++- crates/config/src/lib.rs | 4 ++ crates/services/controller/Cargo.toml | 1 + crates/services/controller/src/service.rs | 49 ++++++++++--------- .../src/testlib/fixtures/daemon_controller.rs | 1 + 9 files changed, 74 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 79923c664..702af8255 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3266,6 +3266,7 @@ name = "controller" version = "0.1.0" dependencies = [ "admin-api", + "amp-config", "amp-data-store", "amp-datasets-registry", "amp-providers-registry", diff --git a/crates/bin/ampd/src/controller_cmd.rs b/crates/bin/ampd/src/controller_cmd.rs index 8609b47b6..08d645e7b 100644 --- a/crates/bin/ampd/src/controller_cmd.rs +++ b/crates/bin/ampd/src/controller_cmd.rs @@ -72,6 +72,7 @@ pub async fn run( ethcall_udfs_cache, meter, at, + config.gc_scheduling, ) .await .map_err(Error::ServiceInit)?; diff --git a/crates/bin/ampd/src/solo_cmd.rs b/crates/bin/ampd/src/solo_cmd.rs index 9ef623895..ff2d562a4 100644 --- a/crates/bin/ampd/src/solo_cmd.rs +++ b/crates/bin/ampd/src/solo_cmd.rs @@ -136,6 +136,7 @@ pub async fn run( ethcall_udfs_cache.clone(), meter.clone(), config.controller_addrs.admin_api_addr, + config.gc_scheduling.clone(), ) .await .map_err(Error::ServiceInit)?; diff --git a/crates/config/src/config_file.rs b/crates/config/src/config_file.rs index ea3215a40..cd5a63211 100644 --- a/crates/config/src/config_file.rs +++ b/crates/config/src/config_file.rs @@ -201,6 +201,10 @@ pub struct ConfigFile { // Worker event streaming configuration #[serde(default)] pub worker_events: crate::WorkerEventsConfig, + + // Controller GC scheduling configuration + #[serde(default)] + pub gc_scheduling: crate::controller::GcSchedulingConfig, } impl ConfigFile { diff --git a/crates/config/src/controller.rs b/crates/config/src/controller.rs index 635d85697..63101b439 100644 --- a/crates/config/src/controller.rs +++ b/crates/config/src/controller.rs @@ -1,6 +1,9 @@ -use std::net::{AddrParseError, SocketAddr}; +use std::{ + net::{AddrParseError, SocketAddr}, + time::Duration, +}; -use crate::config_file::ConfigFile; +use crate::{config_file::ConfigFile, worker_core::ConfigDuration}; /// Default port for the Admin API server. pub const DEFAULT_CONTROLLER_ADMIN_API_PORT: u16 = 1610; @@ -12,6 +15,35 @@ pub struct ControllerAddrs { pub admin_api_addr: SocketAddr, } +/// Configuration for controller-managed garbage collection scheduling. +/// +/// Controls whether the controller schedules GC jobs and at what interval. +/// GC jobs are executed by workers, not the controller. +#[derive(Debug, Clone, Default, serde::Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub struct GcSchedulingConfig { + /// Enable GC job scheduling (default: `false`). + /// + /// When `false`, the controller will not schedule any GC jobs. + /// Enable only after verifying the legacy worker-internal GC is disabled. + #[serde(default)] + pub enabled: bool, + + /// Interval in seconds between GC scheduling sweeps (default: 60). + /// + /// The controller scans for active physical table revisions at this + /// interval and schedules GC jobs for each one. + #[serde(default)] + pub interval: ConfigDuration<60>, +} + +impl GcSchedulingConfig { + /// Returns the scheduling interval as a [`Duration`]. + pub fn interval_duration(&self) -> Duration { + self.interval.clone().into() + } +} + impl Default for ControllerAddrs { fn default() -> Self { Self { diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index f6d12f0c0..f15fd9f0e 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -20,6 +20,7 @@ pub use self::{ DEFAULT_MANIFESTS_DIRNAME, DEFAULT_MICROBATCH_MAX_INTERVAL, DEFAULT_PROVIDERS_DIRNAME, DEFAULT_SERVER_MICROBATCH_MAX_INTERVAL, no_defaults_override, }, + controller::GcSchedulingConfig, metadb::{DEFAULT_METADB_CONN_POOL_SIZE, DEFAULT_METADB_DIRNAME, MetadataDbConfig}, redacted::Redacted, }; @@ -112,6 +113,7 @@ fn resolve_config( poll_interval: config_file.poll_interval_secs.into(), keep_alive_interval: config_file.keep_alive_interval, worker_events, + gc_scheduling: config_file.gc_scheduling, }) } @@ -230,6 +232,8 @@ pub struct Config { pub keep_alive_interval: u64, /// Worker event streaming configuration. pub worker_events: WorkerEventsConfig, + /// GC scheduling configuration for the controller. + pub gc_scheduling: GcSchedulingConfig, } /// Configuration for worker event streaming. diff --git a/crates/services/controller/Cargo.toml b/crates/services/controller/Cargo.toml index e32a5f286..f3404dde9 100644 --- a/crates/services/controller/Cargo.toml +++ b/crates/services/controller/Cargo.toml @@ -6,6 +6,7 @@ license-file.workspace = true [dependencies] admin-api = { path = "../admin-api" } +amp-config = { path = "../../config" } amp-data-store = { path = "../../core/data-store" } amp-datasets-registry = { version = "0.1.0", path = "../../core/datasets-registry" } amp-providers-registry = { version = "0.1.0", path = "../../core/providers-registry" } diff --git a/crates/services/controller/src/service.rs b/crates/services/controller/src/service.rs index 73aceda6b..25276ff94 100644 --- a/crates/services/controller/src/service.rs +++ b/crates/services/controller/src/service.rs @@ -24,12 +24,6 @@ use crate::{build_info::BuildInfo, scheduler::Scheduler}; /// The scheduler checks for failed jobs ready for retry at this interval. const RECONCILIATION_INTERVAL: Duration = Duration::from_secs(60); -/// Interval between GC scheduling sweeps. -/// -/// The controller scans for active physical table revisions and schedules -/// GC jobs at this interval. -const GC_SCHEDULING_INTERVAL: Duration = Duration::from_secs(60); - /// Create and initialize the controller service /// /// Sets up the admin API server with the scheduler, dataset store, and metadata database. @@ -48,6 +42,7 @@ pub async fn new( ethcall_udfs_cache: EthCallUdfsCache, meter: Option, at: SocketAddr, + gc_config: amp_config::GcSchedulingConfig, ) -> Result<(SocketAddr, impl Future>), Error> { let build_info = build_info.into(); @@ -100,22 +95,26 @@ pub async fn new( } }); - // Spawn background task for GC job scheduling - let scheduler_for_gc = scheduler.clone(); - let gc_scheduling_task = tokio::spawn(async move { - let mut interval = tokio::time::interval(GC_SCHEDULING_INTERVAL); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - - loop { - interval.tick().await; - if let Err(err) = scheduler_for_gc.schedule_gc_jobs().await { - tracing::error!( - error = %err, - error_source = monitoring::logging::error_source(&err), - "GC job scheduling failed" - ); + // Optionally spawn background task for GC job scheduling + let gc_scheduling_task = gc_config.enabled.then(|| { + let scheduler_for_gc = scheduler.clone(); + let gc_interval = gc_config.interval_duration(); + + tokio::spawn(async move { + let mut interval = tokio::time::interval(gc_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + interval.tick().await; + if let Err(err) = scheduler_for_gc.schedule_gc_jobs().await { + tracing::error!( + error = %err, + error_source = monitoring::logging::error_source(&err), + "GC job scheduling failed" + ); + } } - } + }) }); // The service future that runs the server with graceful shutdown @@ -129,7 +128,13 @@ pub async fn new( _ = reconciliation_task => { Err(ServerError::ReconciliationTerminated) } - _ = gc_scheduling_task => { + _ = async { + if let Some(task) = gc_scheduling_task { + task.await.ok(); + } else { + std::future::pending::<()>().await; + } + } => { Err(ServerError::GcSchedulingTerminated) } } diff --git a/tests/src/testlib/fixtures/daemon_controller.rs b/tests/src/testlib/fixtures/daemon_controller.rs index d8d93058e..cb2b05f0b 100644 --- a/tests/src/testlib/fixtures/daemon_controller.rs +++ b/tests/src/testlib/fixtures/daemon_controller.rs @@ -59,6 +59,7 @@ impl DaemonController { ethcall_udfs_cache, meter, admin_api_addr, + amp_config::GcSchedulingConfig::default(), ) .await?; From 85257d794f5bfed3e4586fe9f39bee003b92eab7 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Tue, 17 Mar 2026 21:23:52 -0500 Subject: [PATCH 03/11] feat(amp-worker-gc): add metrics, last_success_at scheduling, and integration tests - Add GcMetrics (expired_files_found, metadata_entries_deleted, files_deleted, files_not_found) with OpenTelemetry counters keyed by location_id - Add last_success_at check in schedule_gc_jobs() to respect the configured interval between GC runs per location (RFC compliance) - Add 3 integration tests in tests/src/tests/it_gc.rs verifying the full collection algorithm against real Postgres + filesystem - Update config.sample.toml with [gc_scheduling] section --- Cargo.lock | 3 + crates/core/worker-gc/Cargo.toml | 1 + crates/core/worker-gc/src/job_ctx.rs | 3 + crates/core/worker-gc/src/job_impl.rs | 18 +- crates/core/worker-gc/src/lib.rs | 1 + crates/core/worker-gc/src/metrics.rs | 62 +++++ crates/services/controller/src/scheduler.rs | 41 ++- crates/services/controller/src/service.rs | 2 +- .../services/worker/src/service/job_impl.rs | 1 + docs/config.sample.toml | 7 + docs/schemas/config/ampd.spec.json | 18 ++ tests/Cargo.toml | 2 + tests/src/tests/it_gc.rs | 260 ++++++++++++++++++ tests/src/tests/mod.rs | 1 + 14 files changed, 413 insertions(+), 7 deletions(-) create mode 100644 crates/core/worker-gc/src/metrics.rs create mode 100644 tests/src/tests/it_gc.rs diff --git a/Cargo.lock b/Cargo.lock index 702af8255..3e2992bea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1330,6 +1330,7 @@ dependencies = [ "datasets-common", "futures", "metadata-db", + "monitoring", "object_store", "serde", "serde_json", @@ -12425,6 +12426,7 @@ dependencies = [ "amp-providers-registry", "amp-providers-solana", "amp-worker-core", + "amp-worker-gc", "ampctl", "anyhow", "arrow-flight", @@ -12451,6 +12453,7 @@ dependencies = [ "object_store", "opentelemetry", "opentelemetry_sdk", + "pgtemp", "pretty_assertions", "rand 0.9.2", "reqwest 0.13.2", diff --git a/crates/core/worker-gc/Cargo.toml b/crates/core/worker-gc/Cargo.toml index a4065a536..3b81d5673 100644 --- a/crates/core/worker-gc/Cargo.toml +++ b/crates/core/worker-gc/Cargo.toml @@ -10,6 +10,7 @@ amp-worker-core = { path = "../worker-core" } datasets-common = { path = "../datasets-common" } futures.workspace = true metadata-db = { path = "../metadata-db" } +monitoring = { path = "../monitoring" } object_store.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/crates/core/worker-gc/src/job_ctx.rs b/crates/core/worker-gc/src/job_ctx.rs index cf10581c1..696fcb491 100644 --- a/crates/core/worker-gc/src/job_ctx.rs +++ b/crates/core/worker-gc/src/job_ctx.rs @@ -2,6 +2,7 @@ use amp_data_store::DataStore; use metadata_db::MetadataDb; +use monitoring::telemetry::metrics::Meter; /// Dependencies required to execute a GC job. pub struct Context { @@ -9,4 +10,6 @@ pub struct Context { pub metadata_db: MetadataDb, /// Connection to object storage (S3/GCS/local FS). pub data_store: DataStore, + /// OpenTelemetry meter for recording GC metrics. + pub meter: Option, } diff --git a/crates/core/worker-gc/src/job_impl.rs b/crates/core/worker-gc/src/job_impl.rs index 30251f354..e4f7ba8bc 100644 --- a/crates/core/worker-gc/src/job_impl.rs +++ b/crates/core/worker-gc/src/job_impl.rs @@ -12,7 +12,7 @@ use metadata_db::{ }; use object_store::{Error as ObjectStoreError, path::Path}; -use crate::{error::Error, job_ctx::Context, job_descriptor::JobDescriptor}; +use crate::{error::Error, job_ctx::Context, job_descriptor::JobDescriptor, metrics::GcMetrics}; /// Execute a garbage collection job for a single physical table revision. /// @@ -36,12 +36,14 @@ pub async fn execute(ctx: Context, desc: JobDescriptor) -> Result<(), Error> { .ok_or(Error::LocationNotFound(location_id))?; let revision_path: PhyTableRevisionPath = revision.path.into(); + let metrics = ctx.meter.as_ref().map(|m| GcMetrics::new(m, *location_id)); collect( &ctx.metadata_db, &ctx.data_store, location_id, &revision_path, + metrics.as_ref(), ) .await } @@ -52,6 +54,7 @@ async fn collect( data_store: &DataStore, location_id: LocationId, revision_path: &PhyTableRevisionPath, + metrics: Option<&GcMetrics>, ) -> Result<(), Error> { // Step 1: Stream expired files from the GC manifest let found_file_ids_to_paths: BTreeMap = @@ -79,6 +82,10 @@ async fn collect( return Ok(()); } + if let Some(m) = metrics { + m.record_expired_files_found(found_file_ids_to_paths.len() as u64); + } + // Step 2: Delete file metadata from Postgres let file_ids_to_delete: Vec = found_file_ids_to_paths.keys().copied().collect(); let paths_to_remove: BTreeSet = @@ -94,6 +101,10 @@ async fn collect( "metadata entries deleted" ); + if let Some(m) = metrics { + m.record_metadata_entries_deleted(paths_to_remove.len() as u64); + } + // Step 3: Delete physical files from object storage let mut delete_stream = data_store.delete_files_stream(stream::iter(paths_to_remove).map(Ok).boxed()); @@ -123,6 +134,11 @@ async fn collect( } } + if let Some(m) = metrics { + m.record_files_deleted(files_deleted); + m.record_files_not_found(files_not_found); + } + tracing::info!(files_deleted, files_not_found, "collection complete"); Ok(()) diff --git a/crates/core/worker-gc/src/lib.rs b/crates/core/worker-gc/src/lib.rs index 8b4451961..3619a08ef 100644 --- a/crates/core/worker-gc/src/lib.rs +++ b/crates/core/worker-gc/src/lib.rs @@ -4,3 +4,4 @@ pub mod job_descriptor; pub mod job_impl; pub mod job_key; pub mod job_kind; +pub mod metrics; diff --git a/crates/core/worker-gc/src/metrics.rs b/crates/core/worker-gc/src/metrics.rs new file mode 100644 index 000000000..9286cbd63 --- /dev/null +++ b/crates/core/worker-gc/src/metrics.rs @@ -0,0 +1,62 @@ +//! Metrics for GC job execution. + +use monitoring::telemetry::metrics::{Counter, KeyValue, Meter}; + +/// Metrics recorded during garbage collection execution. +pub struct GcMetrics { + location_id: i64, + expired_files_found: Counter, + metadata_entries_deleted: Counter, + files_deleted: Counter, + files_not_found: Counter, +} + +impl GcMetrics { + /// Create a new metrics instance for a GC job. + pub fn new(meter: &Meter, location_id: i64) -> Self { + Self { + location_id, + expired_files_found: Counter::new( + meter, + "gc_expired_files_found", + "Number of expired files found in the GC manifest", + ), + metadata_entries_deleted: Counter::new( + meter, + "gc_metadata_entries_deleted", + "Number of file metadata entries deleted from Postgres", + ), + files_deleted: Counter::new( + meter, + "gc_files_deleted", + "Number of physical files deleted from object storage", + ), + files_not_found: Counter::new( + meter, + "gc_files_not_found", + "Number of expired files already missing from object storage", + ), + } + } + + fn kvs(&self) -> [KeyValue; 1] { + [KeyValue::new("location_id", self.location_id)] + } + + pub fn record_expired_files_found(&self, count: u64) { + self.expired_files_found.inc_by_with_kvs(count, &self.kvs()); + } + + pub fn record_metadata_entries_deleted(&self, count: u64) { + self.metadata_entries_deleted + .inc_by_with_kvs(count, &self.kvs()); + } + + pub fn record_files_deleted(&self, count: u64) { + self.files_deleted.inc_by_with_kvs(count, &self.kvs()); + } + + pub fn record_files_not_found(&self, count: u64) { + self.files_not_found.inc_by_with_kvs(count, &self.kvs()); + } +} diff --git a/crates/services/controller/src/scheduler.rs b/crates/services/controller/src/scheduler.rs index 526e4596b..2c95df293 100644 --- a/crates/services/controller/src/scheduler.rs +++ b/crates/services/controller/src/scheduler.rs @@ -359,11 +359,14 @@ impl Scheduler { /// Schedule GC jobs for all active physical table revisions. /// - /// Paginates through all active revisions and schedules a GC job for each one. - /// The idempotency key (`gc:{location_id}`) prevents duplicate jobs for the same - /// location. If a GC job is already scheduled or running for a location, the existing - /// job ID is returned without creating a new one. - pub async fn schedule_gc_jobs(&self) -> Result<(), ScheduleGcJobsError> { + /// Paginates through all active revisions and schedules a GC job for each one, + /// respecting the configured interval. For each location: + /// + /// 1. Look up the existing job by idempotency key. + /// 2. If in-flight (non-terminal state), skip. + /// 3. If completed recently (`now - updated_at < gc_interval`), skip. + /// 4. Otherwise, schedule a new job. + pub async fn schedule_gc_jobs(&self, gc_interval: Duration) -> Result<(), ScheduleGcJobsError> { let page_size: i64 = 100; let mut last_id: Option = None; @@ -383,6 +386,30 @@ impl Scheduler { for revision in &revisions { let idempotency_key = amp_worker_gc::job_key::idempotency_key(revision.id); + + // Check if a GC job already exists for this location + let existing = + metadata_db::jobs::get_by_idempotency_key(&self.metadata_db, &idempotency_key) + .await + .map_err(ScheduleGcJobsError::CheckExistingJob)?; + + if let Some(job) = &existing { + // Skip if job is still in-flight + if !job.status.is_terminal() { + continue; + } + + // Skip if last completion was too recent + if job.status == JobStatus::Completed.into() { + let now = std::time::SystemTime::now(); + let updated: std::time::SystemTime = job.updated_at.into(); + let elapsed = now.duration_since(updated).unwrap_or_default(); + if elapsed < gc_interval { + continue; + } + } + } + let descriptor = admin_api::scheduler::JobDescriptor::from( amp_worker_gc::job_descriptor::JobDescriptor { location_id: revision.id, @@ -434,6 +461,10 @@ pub enum ScheduleGcJobsError { /// Failed to list active physical table revisions #[error("failed to list active revisions")] ListRevisions(#[source] metadata_db::Error), + + /// Failed to check existing job for a location + #[error("failed to check existing GC job")] + CheckExistingJob(#[source] metadata_db::Error), } /// Errors that occur during failed job reconciliation [`Scheduler::reconcile_failed_jobs`] diff --git a/crates/services/controller/src/service.rs b/crates/services/controller/src/service.rs index 25276ff94..f22279b49 100644 --- a/crates/services/controller/src/service.rs +++ b/crates/services/controller/src/service.rs @@ -106,7 +106,7 @@ pub async fn new( loop { interval.tick().await; - if let Err(err) = scheduler_for_gc.schedule_gc_jobs().await { + if let Err(err) = scheduler_for_gc.schedule_gc_jobs(gc_interval).await { tracing::error!( error = %err, error_source = monitoring::logging::error_source(&err), diff --git a/crates/services/worker/src/service/job_impl.rs b/crates/services/worker/src/service/job_impl.rs index f0720f179..4854e0723 100644 --- a/crates/services/worker/src/service/job_impl.rs +++ b/crates/services/worker/src/service/job_impl.rs @@ -32,6 +32,7 @@ pub(super) async fn new( let ctx = amp_worker_gc::job_ctx::Context { metadata_db: job_ctx.metadata_db.clone(), data_store: job_ctx.data_store.clone(), + meter: job_ctx.meter.clone(), }; amp_worker_gc::job_impl::execute(ctx, desc) diff --git a/docs/config.sample.toml b/docs/config.sample.toml index b403214bb..1e0c26832 100644 --- a/docs/config.sample.toml +++ b/docs/config.sample.toml @@ -105,6 +105,13 @@ url = "postgres://" # min_interval = 30.0 # Interval in seconds to run the garbage collector (default: 30.0) # deletion_lock_duration = 1800.0 # Duration in seconds to hold deletion lock on compacted files (default: 1800.0 = 30 minutes) +# Controller-managed garbage collection scheduling +# When enabled, the controller schedules GC jobs via the job ledger for workers to execute. +# This is separate from the legacy worker-internal GC ([writer.collector]). +[gc_scheduling] +# enabled = false # Enable GC job scheduling (default: false) +# interval = 60.0 # Interval in seconds between GC scheduling sweeps (default: 60.0) + # Worker configuration [worker_events] # enabled = false # Enable event streaming to Kafka (default: false) diff --git a/docs/schemas/config/ampd.spec.json b/docs/schemas/config/ampd.spec.json index b231ed9b2..3260177e1 100644 --- a/docs/schemas/config/ampd.spec.json +++ b/docs/schemas/config/ampd.spec.json @@ -23,6 +23,9 @@ "null" ] }, + "gc_scheduling": { + "$ref": "#/$defs/GcSchedulingConfig" + }, "jsonl_addr": { "description": "JSON Lines server address (default: \"0.0.0.0:1603\")", "type": [ @@ -202,6 +205,21 @@ "description": "Duration in seconds (floating-point)", "type": "number" }, + "GcSchedulingConfig": { + "description": "Configuration for controller-managed garbage collection scheduling.\n\nControls whether the controller schedules GC jobs and at what interval.\nGC jobs are executed by workers, not the controller.", + "type": "object", + "properties": { + "enabled": { + "description": "Enable GC job scheduling (default: `false`).\n\nWhen `false`, the controller will not schedule any GC jobs.\nEnable only after verifying the legacy worker-internal GC is disabled.", + "type": "boolean", + "default": false + }, + "interval": { + "description": "Interval in seconds between GC scheduling sweeps (default: 60).\n\nThe controller scans for active physical table revisions at this\ninterval and schedules GC jobs for each one.", + "$ref": "#/$defs/ConfigDuration" + } + } + }, "KafkaEventsConfig": { "description": "Kafka configuration for worker events.", "type": "object", diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 85eaff59a..f64f43851 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -22,6 +22,7 @@ amp-parquet = { path = "../crates/core/parquet" } amp-providers-registry = { path = "../crates/core/providers-registry" } amp-providers-solana = { path = "../crates/core/providers-solana" } amp-worker-core = { path = "../crates/core/worker-core" } +amp-worker-gc = { path = "../crates/core/worker-gc" } amp-client-admin = { package = "admin-client", path = "../crates/clients/admin" } ampctl = { path = "../crates/bin/ampctl" } anyhow.workspace = true @@ -51,6 +52,7 @@ monitoring = { path = "../crates/core/monitoring" } object_store.workspace = true opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["testing"] } +pgtemp.workspace = true pretty_assertions.workspace = true rand.workspace = true reqwest.workspace = true diff --git a/tests/src/tests/it_gc.rs b/tests/src/tests/it_gc.rs new file mode 100644 index 000000000..c655707aa --- /dev/null +++ b/tests/src/tests/it_gc.rs @@ -0,0 +1,260 @@ +//! Integration tests for GC job execution. +//! +//! These tests require a running PostgreSQL instance (provided by `pgtemp`). +//! They verify the full collection algorithm: expired file streaming, +//! metadata deletion, and physical file deletion. + +use std::time::Duration; + +use amp_data_store::DataStore; +use amp_worker_gc::{job_ctx::Context, job_descriptor::JobDescriptor}; +use futures::TryStreamExt; +use metadata_db::{ + datasets::{DatasetName, DatasetNamespace}, + files::{self, FileName}, + gc, physical_table, + physical_table_revision::{self, LocationId, TablePath}, +}; +use pgtemp::PgTempDB; +use url::Url; + +fn create_temp_dir(name: &str) -> std::path::PathBuf { + let dir = std::env::temp_dir().join(format!("amp_gc_test_{name}_{}", std::process::id())); + let _ = std::fs::remove_dir_all(&dir); + std::fs::create_dir_all(&dir).expect("Failed to create temp dir"); + dir +} + +const TEST_WORKER_ID: &str = "test-worker"; + +async fn setup_test_db() -> (PgTempDB, metadata_db::MetadataDb) { + let temp_db = PgTempDB::new(); + let conn = metadata_db::connect_pool_with_retry(&temp_db.connection_uri(), 5) + .await + .expect("Failed to connect to metadata db"); + + // Register a worker (required for foreign key constraints) + let worker_id = metadata_db::workers::WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); + let raw: Box = + serde_json::from_str("{}").expect("empty JSON should be valid"); + let worker_info = metadata_db::workers::WorkerInfo::from_owned_unchecked(raw); + metadata_db::workers::register(&conn, worker_id, worker_info) + .await + .expect("Failed to register worker"); + + (temp_db, conn) +} + +async fn register_table_and_revision(conn: &metadata_db::MetadataDb, path: &str) -> LocationId { + let namespace = DatasetNamespace::from_ref_unchecked("test_ns"); + let name = DatasetName::from_ref_unchecked("test_dataset"); + let hash = metadata_db::manifests::ManifestHash::from_ref_unchecked( + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + ); + let table_name = physical_table::TableName::from_ref_unchecked("test_table"); + + physical_table::register(conn, &namespace, &name, &hash, &table_name) + .await + .expect("Failed to register physical table"); + + let metadata_json = serde_json::json!({ + "dataset_namespace": "test_ns", + "dataset_name": "test_dataset", + "manifest_hash": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "table_name": "test_table", + }); + let raw = + serde_json::value::to_raw_value(&metadata_json).expect("test metadata should serialize"); + let metadata = physical_table_revision::RevisionMetadata::from_owned_unchecked(raw); + let table_path = TablePath::from_ref_unchecked(path); + + physical_table_revision::register(conn, table_path, metadata) + .await + .expect("Failed to register revision") +} + +async fn register_file( + conn: &metadata_db::MetadataDb, + location_id: LocationId, + file_name: &str, + url: &Url, +) { + files::register( + conn, + location_id, + url, + FileName::from_ref_unchecked(file_name), + 100, // object_size + None, + None, + serde_json::json!({}), // parquet_meta + &vec![0u8; 10], // footer + ) + .await + .expect("Failed to register file"); +} + +#[tokio::test] +async fn gc_deletes_expired_files() { + //* Given + let (_db, conn) = setup_test_db().await; + + // Create a temp directory for the object store + let temp_dir = create_temp_dir("deletes_expired"); + let data_url = + amp_object_store::url::ObjectStoreUrl::new(format!("file://{}/", temp_dir.display())) + .expect("Failed to create object store URL"); + + let data_store = + DataStore::new(conn.clone(), data_url, 0).expect("Failed to create data store"); + + // Register a physical table and revision + let revision_path = "test_ns/test_dataset/aaaaaaaaaa/test_table"; + let location_id = register_table_and_revision(&conn, revision_path).await; + + // Create physical files on disk + let revision_dir = temp_dir.join(revision_path); + std::fs::create_dir_all(&revision_dir).expect("Failed to create revision dir"); + std::fs::write(revision_dir.join("file_a.parquet"), b"test data a") + .expect("Failed to write file_a"); + std::fs::write(revision_dir.join("file_b.parquet"), b"test data b") + .expect("Failed to write file_b"); + + // Register file metadata in Postgres + let base_url = Url::parse(&format!("file://{}/{}/", temp_dir.display(), revision_path,)) + .expect("Failed to parse URL"); + + let url_a = base_url.join("file_a.parquet").unwrap(); + let url_b = base_url.join("file_b.parquet").unwrap(); + register_file(&conn, location_id, "file_a.parquet", &url_a).await; + register_file(&conn, location_id, "file_b.parquet", &url_b).await; + + // Get file IDs from the database + let file_ids: Vec = + files::stream_by_location_id_with_details(&conn, location_id) + .map_ok(|f| f.id) + .try_collect() + .await + .expect("Failed to stream files"); + assert_eq!(file_ids.len(), 2, "should have 2 registered files"); + + // Schedule files for GC with 1s duration (minimum allowed by DB constraint). + // Then wait for expiration before running GC. + gc::upsert(&conn, location_id, &file_ids, Duration::from_secs(1)) + .await + .expect("Failed to upsert GC manifest"); + + // Wait for the files to expire + tokio::time::sleep(Duration::from_secs(2)).await; + + //* When + let ctx = Context { + metadata_db: conn.clone(), + data_store, + meter: None, + }; + let desc = JobDescriptor { location_id }; + + amp_worker_gc::job_impl::execute(ctx, desc) + .await + .expect("GC execution failed"); + + //* Then + // Verify metadata is gone + let remaining_files: Vec = + files::stream_by_location_id_with_details(&conn, location_id) + .map_ok(|f| f.id) + .try_collect() + .await + .expect("Failed to stream files"); + assert!( + remaining_files.is_empty(), + "all file metadata should be deleted" + ); + + // Verify GC manifest is empty (cascaded from file_metadata deletion) + let expired: Vec = gc::stream_expired(&conn, location_id) + .try_collect() + .await + .expect("Failed to stream expired"); + assert!(expired.is_empty(), "GC manifest should be empty"); + + // Verify physical files are gone + assert!( + !revision_dir.join("file_a.parquet").exists(), + "file_a.parquet should be deleted from disk" + ); + assert!( + !revision_dir.join("file_b.parquet").exists(), + "file_b.parquet should be deleted from disk" + ); +} + +#[tokio::test] +async fn gc_with_no_expired_files_is_a_noop() { + //* Given + let (_db, conn) = setup_test_db().await; + + let temp_dir = create_temp_dir("noop"); + let data_url = + amp_object_store::url::ObjectStoreUrl::new(format!("file://{}/", temp_dir.display())) + .expect("Failed to create object store URL"); + + let data_store = + DataStore::new(conn.clone(), data_url, 0).expect("Failed to create data store"); + + let revision_path = "test_ns/test_dataset/aaaaaaaaaa/test_table"; + let location_id = register_table_and_revision(&conn, revision_path).await; + + // No files registered, no GC manifest entries + + //* When + let ctx = Context { + metadata_db: conn.clone(), + data_store, + meter: None, + }; + let desc = JobDescriptor { location_id }; + + amp_worker_gc::job_impl::execute(ctx, desc) + .await + .expect("GC execution should succeed with no expired files"); + + //* Then — no errors, no panics +} + +#[tokio::test] +async fn gc_with_nonexistent_location_returns_error() { + //* Given + let (_db, conn) = setup_test_db().await; + + let temp_dir = create_temp_dir("nonexistent"); + let data_url = + amp_object_store::url::ObjectStoreUrl::new(format!("file://{}/", temp_dir.display())) + .expect("Failed to create object store URL"); + + let data_store = + DataStore::new(conn.clone(), data_url, 0).expect("Failed to create data store"); + + //* When + let ctx = Context { + metadata_db: conn.clone(), + data_store, + meter: None, + }; + let desc = JobDescriptor { + location_id: LocationId::try_from(999999i64).unwrap(), + }; + + let result = amp_worker_gc::job_impl::execute(ctx, desc).await; + + //* Then + assert!(result.is_err(), "should fail with LocationNotFound"); + assert!( + matches!( + result.unwrap_err(), + amp_worker_gc::error::Error::LocationNotFound(_) + ), + "error should be LocationNotFound" + ); +} diff --git a/tests/src/tests/mod.rs b/tests/src/tests/mod.rs index 875e6a716..22db01155 100644 --- a/tests/src/tests/mod.rs +++ b/tests/src/tests/mod.rs @@ -16,6 +16,7 @@ mod it_dependencies; mod it_dump; mod it_functions; mod it_functions_eth_call; +mod it_gc; mod it_joins; mod it_multi_network_batch; mod it_multi_table_continuous; From 963cf0d72d2eaadf16211b80147e74645133c315 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Wed, 18 Mar 2026 09:59:44 -0500 Subject: [PATCH 04/11] fix(monitoring): add amp_worker_gc to workspace crates list The workspace_crates_match_amp_crates_list test validates that the hardcoded AMP_CRATES list matches actual workspace members. Adding the new amp-worker-gc crate to the workspace requires updating this list. --- crates/core/monitoring/src/logging.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/core/monitoring/src/logging.rs b/crates/core/monitoring/src/logging.rs index 35432863a..cd684e922 100644 --- a/crates/core/monitoring/src/logging.rs +++ b/crates/core/monitoring/src/logging.rs @@ -81,6 +81,7 @@ const AMP_CRATES: &[&str] = &[ "amp_worker_core", "amp_worker_datasets_derived", "amp_worker_datasets_raw", + "amp_worker_gc", "ampcc", "ampctl", "ampd", From c315b3282d79f2d0ce876cbea655799c1d440ea1 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Wed, 18 Mar 2026 10:54:41 -0500 Subject: [PATCH 05/11] feat(controller,amp-worker-gc): add controller-level metrics and table_ref tracing - Add GcSchedulerMetrics to controller scheduler with gc_jobs_dispatched_total, gc_jobs_skipped_in_flight_total, and gc_jobs_skipped_too_recent_total counters - Add table_ref field to GC job execution tracing span, recorded from the revision path after lookup - Pass Meter to Scheduler for metrics initialization --- crates/core/worker-gc/src/job_impl.rs | 5 ++- crates/services/controller/src/scheduler.rs | 50 ++++++++++++++++++++- crates/services/controller/src/service.rs | 2 +- 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/crates/core/worker-gc/src/job_impl.rs b/crates/core/worker-gc/src/job_impl.rs index e4f7ba8bc..3d35a26b9 100644 --- a/crates/core/worker-gc/src/job_impl.rs +++ b/crates/core/worker-gc/src/job_impl.rs @@ -24,7 +24,7 @@ use crate::{error::Error, job_ctx::Context, job_descriptor::JobDescriptor, metri /// /// Metadata is deleted before physical files. If the process crashes between steps 3 and 4, /// orphaned files may remain in storage but no dangling metadata references will exist. -#[tracing::instrument(skip_all, err, fields(location_id = %desc.location_id))] +#[tracing::instrument(skip_all, err, fields(location_id = %desc.location_id, table_ref))] pub async fn execute(ctx: Context, desc: JobDescriptor) -> Result<(), Error> { let location_id = desc.location_id; @@ -35,6 +35,9 @@ pub async fn execute(ctx: Context, desc: JobDescriptor) -> Result<(), Error> { .map_err(Error::MetadataDb)? .ok_or(Error::LocationNotFound(location_id))?; + // Record table_ref on the current span now that we have the revision path + tracing::Span::current().record("table_ref", revision.path.as_str()); + let revision_path: PhyTableRevisionPath = revision.path.into(); let metrics = ctx.meter.as_ref().map(|m| GcMetrics::new(m, *location_id)); diff --git a/crates/services/controller/src/scheduler.rs b/crates/services/controller/src/scheduler.rs index 2c95df293..0c15e57cd 100644 --- a/crates/services/controller/src/scheduler.rs +++ b/crates/services/controller/src/scheduler.rs @@ -63,12 +63,49 @@ const DEAD_WORKER_INTERVAL: Duration = Duration::from_secs(5); /// Thread-safe for sharing across async tasks via `Arc`. pub struct Scheduler { metadata_db: MetadataDb, + gc_metrics: Option, +} + +/// Controller-level metrics for GC job scheduling. +struct GcSchedulerMetrics { + jobs_dispatched: monitoring::telemetry::metrics::Counter, + jobs_skipped_in_flight: monitoring::telemetry::metrics::Counter, + jobs_skipped_too_recent: monitoring::telemetry::metrics::Counter, +} + +impl GcSchedulerMetrics { + fn new(meter: &monitoring::telemetry::metrics::Meter) -> Self { + use monitoring::telemetry::metrics::Counter; + Self { + jobs_dispatched: Counter::new( + meter, + "gc_jobs_dispatched_total", + "Total number of GC jobs dispatched to workers", + ), + jobs_skipped_in_flight: Counter::new( + meter, + "gc_jobs_skipped_in_flight_total", + "Total number of GC scheduling attempts skipped because a job is already in-flight", + ), + jobs_skipped_too_recent: Counter::new( + meter, + "gc_jobs_skipped_too_recent_total", + "Total number of GC scheduling attempts skipped because last completion was too recent", + ), + } + } } impl Scheduler { /// Create a new scheduler instance - pub fn new(metadata_db: MetadataDb) -> Self { - Self { metadata_db } + pub fn new( + metadata_db: MetadataDb, + meter: Option<&monitoring::telemetry::metrics::Meter>, + ) -> Self { + Self { + metadata_db, + gc_metrics: meter.map(GcSchedulerMetrics::new), + } } /// Schedule a job with a pre-built descriptor @@ -396,6 +433,9 @@ impl Scheduler { if let Some(job) = &existing { // Skip if job is still in-flight if !job.status.is_terminal() { + if let Some(m) = &self.gc_metrics { + m.jobs_skipped_in_flight.inc_by(1); + } continue; } @@ -405,6 +445,9 @@ impl Scheduler { let updated: std::time::SystemTime = job.updated_at.into(); let elapsed = now.duration_since(updated).unwrap_or_default(); if elapsed < gc_interval { + if let Some(m) = &self.gc_metrics { + m.jobs_skipped_too_recent.inc_by(1); + } continue; } } @@ -421,6 +464,9 @@ impl Scheduler { .await { Ok(job_id) => { + if let Some(m) = &self.gc_metrics { + m.jobs_dispatched.inc_by(1); + } tracing::debug!( location_id = %revision.id, %job_id, diff --git a/crates/services/controller/src/service.rs b/crates/services/controller/src/service.rs index f22279b49..098aea9ec 100644 --- a/crates/services/controller/src/service.rs +++ b/crates/services/controller/src/service.rs @@ -46,7 +46,7 @@ pub async fn new( ) -> Result<(SocketAddr, impl Future>), Error> { let build_info = build_info.into(); - let scheduler = Arc::new(Scheduler::new(metadata_db.clone())); + let scheduler = Arc::new(Scheduler::new(metadata_db.clone(), meter.as_ref())); let ctx = Ctx { metadata_db, From d75418506a34b5751a4d81f1ebc5d1414d326b98 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Fri, 20 Mar 2026 09:53:55 -0500 Subject: [PATCH 06/11] refactor(controller): pre-filter GC scheduling by expired gc_manifest entries Instead of paginating through all active revisions and scheduling no-op GC jobs, query gc_manifest for distinct location_ids with expired entries. This reduces unnecessary job ledger writes and worker notifications. Also removes the active-only filter so deactivated revisions are GC-eligible. GC is only concerned with the state of the gc_manifest, not the revision's active status. --- crates/core/metadata-db/src/gc.rs | 11 ++ crates/core/metadata-db/src/gc/sql.rs | 15 ++ crates/services/controller/src/scheduler.rs | 146 +++++++++----------- 3 files changed, 90 insertions(+), 82 deletions(-) diff --git a/crates/core/metadata-db/src/gc.rs b/crates/core/metadata-db/src/gc.rs index 3f3baa5ae..f24242a95 100644 --- a/crates/core/metadata-db/src/gc.rs +++ b/crates/core/metadata-db/src/gc.rs @@ -30,6 +30,17 @@ where .map_err(Error::Database) } +/// Returns distinct location IDs that have at least one expired entry in the GC manifest. +#[tracing::instrument(skip(exe), err)] +pub async fn locations_with_expired_entries<'c, E>(exe: E) -> Result, Error> +where + E: Executor<'c>, +{ + sql::locations_with_expired_entries(exe) + .await + .map_err(Error::Database) +} + /// Streams files that have passed their expiration time and are ready for deletion. #[tracing::instrument(skip(exe))] pub fn stream_expired<'c, E>( diff --git a/crates/core/metadata-db/src/gc/sql.rs b/crates/core/metadata-db/src/gc/sql.rs index 51a101755..25050b5f6 100644 --- a/crates/core/metadata-db/src/gc/sql.rs +++ b/crates/core/metadata-db/src/gc/sql.rs @@ -82,6 +82,21 @@ where Box::pin(sqlx::query_as(query).bind(location_id).fetch(exe)) } +/// Returns distinct location IDs that have at least one expired entry in the GC manifest. +pub async fn locations_with_expired_entries<'c, E>(exe: E) -> Result, sqlx::Error> +where + E: Executor<'c, Database = Postgres>, +{ + let query = indoc::indoc! {r#" + SELECT DISTINCT location_id + FROM gc_manifest + WHERE expiration < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + ORDER BY location_id; + "#}; + + sqlx::query_scalar(query).fetch_all(exe).await +} + /// GC manifest row from the database #[derive(Debug, sqlx::FromRow)] pub struct GcManifestRow { diff --git a/crates/services/controller/src/scheduler.rs b/crates/services/controller/src/scheduler.rs index 0c15e57cd..54a03e4fb 100644 --- a/crates/services/controller/src/scheduler.rs +++ b/crates/services/controller/src/scheduler.rs @@ -394,106 +394,88 @@ impl Scheduler { Ok(()) } - /// Schedule GC jobs for all active physical table revisions. + /// Schedule GC jobs for locations that have expired entries in the GC manifest. /// - /// Paginates through all active revisions and schedules a GC job for each one, - /// respecting the configured interval. For each location: + /// Pre-filters by querying `gc_manifest` for distinct location IDs with expired + /// entries, then for each location: /// /// 1. Look up the existing job by idempotency key. /// 2. If in-flight (non-terminal state), skip. /// 3. If completed recently (`now - updated_at < gc_interval`), skip. /// 4. Otherwise, schedule a new job. + /// + /// This includes both active and deactivated revisions — GC is only concerned + /// with the state of the GC manifest, not the revision's active status. pub async fn schedule_gc_jobs(&self, gc_interval: Duration) -> Result<(), ScheduleGcJobsError> { - let page_size: i64 = 100; - let mut last_id: Option = None; - - loop { - let revisions = metadata_db::physical_table_revision::list( - &self.metadata_db, - page_size, - last_id, - Some(true), - ) + let locations = metadata_db::gc::locations_with_expired_entries(&self.metadata_db) .await - .map_err(ScheduleGcJobsError::ListRevisions)?; + .map_err(ScheduleGcJobsError::ListExpiredLocations)?; - if revisions.is_empty() { - break; - } + for location_id in locations { + let idempotency_key = amp_worker_gc::job_key::idempotency_key(location_id); - for revision in &revisions { - let idempotency_key = amp_worker_gc::job_key::idempotency_key(revision.id); + // Check if a GC job already exists for this location + let existing = + metadata_db::jobs::get_by_idempotency_key(&self.metadata_db, &idempotency_key) + .await + .map_err(ScheduleGcJobsError::CheckExistingJob)?; - // Check if a GC job already exists for this location - let existing = - metadata_db::jobs::get_by_idempotency_key(&self.metadata_db, &idempotency_key) - .await - .map_err(ScheduleGcJobsError::CheckExistingJob)?; + if let Some(job) = &existing { + // Skip if job is still in-flight + if !job.status.is_terminal() { + if let Some(m) = &self.gc_metrics { + m.jobs_skipped_in_flight.inc_by(1); + } + continue; + } - if let Some(job) = &existing { - // Skip if job is still in-flight - if !job.status.is_terminal() { + // Skip if last completion was too recent + if job.status == JobStatus::Completed.into() { + let now = std::time::SystemTime::now(); + let updated: std::time::SystemTime = job.updated_at.into(); + let elapsed = now.duration_since(updated).unwrap_or_default(); + if elapsed < gc_interval { if let Some(m) = &self.gc_metrics { - m.jobs_skipped_in_flight.inc_by(1); + m.jobs_skipped_too_recent.inc_by(1); } continue; } - - // Skip if last completion was too recent - if job.status == JobStatus::Completed.into() { - let now = std::time::SystemTime::now(); - let updated: std::time::SystemTime = job.updated_at.into(); - let elapsed = now.duration_since(updated).unwrap_or_default(); - if elapsed < gc_interval { - if let Some(m) = &self.gc_metrics { - m.jobs_skipped_too_recent.inc_by(1); - } - continue; - } - } } + } - let descriptor = admin_api::scheduler::JobDescriptor::from( - amp_worker_gc::job_descriptor::JobDescriptor { - location_id: revision.id, - }, - ); + let descriptor = admin_api::scheduler::JobDescriptor::from( + amp_worker_gc::job_descriptor::JobDescriptor { location_id }, + ); - match self - .schedule_job_impl(idempotency_key, descriptor, None) - .await - { - Ok(job_id) => { - if let Some(m) = &self.gc_metrics { - m.jobs_dispatched.inc_by(1); - } - tracing::debug!( - location_id = %revision.id, - %job_id, - "scheduled GC job" - ); - } - Err(ScheduleJobError::NoWorkersAvailable) => { - tracing::warn!("no workers available for GC scheduling"); - return Ok(()); - } - Err(ScheduleJobError::ActiveJobConflict { .. }) => { - // GC job already running for this location, skip - } - Err(err) => { - tracing::error!( - location_id = %revision.id, - error = %err, - error_source = logging::error_source(&err), - "failed to schedule GC job" - ); + match self + .schedule_job_impl(idempotency_key, descriptor, None) + .await + { + Ok(job_id) => { + if let Some(m) = &self.gc_metrics { + m.jobs_dispatched.inc_by(1); } + tracing::debug!( + %location_id, + %job_id, + "scheduled GC job" + ); + } + Err(ScheduleJobError::NoWorkersAvailable) => { + tracing::warn!("no workers available for GC scheduling"); + return Ok(()); + } + Err(ScheduleJobError::ActiveJobConflict { .. }) => { + // GC job already running for this location, skip + } + Err(err) => { + tracing::error!( + %location_id, + error = %err, + error_source = logging::error_source(&err), + "failed to schedule GC job" + ); } - } - - last_id = revisions.last().map(|r| r.id); - if revisions.len() < page_size as usize { - break; } } @@ -504,9 +486,9 @@ impl Scheduler { /// Errors that occur when scheduling GC jobs [`Scheduler::schedule_gc_jobs`] #[derive(Debug, thiserror::Error)] pub enum ScheduleGcJobsError { - /// Failed to list active physical table revisions - #[error("failed to list active revisions")] - ListRevisions(#[source] metadata_db::Error), + /// Failed to query locations with expired GC manifest entries + #[error("failed to list locations with expired GC entries")] + ListExpiredLocations(#[source] metadata_db::Error), /// Failed to check existing job for a location #[error("failed to check existing GC job")] From f4746411cbb2247a36b585667d8130e667c1496d Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Fri, 20 Mar 2026 10:35:05 -0500 Subject: [PATCH 07/11] refactor(controller): pre-filter GC scheduling by expired gc_manifest entries Instead of paginating through all active revisions and scheduling no-op GC jobs, query gc_manifest for distinct location_ids with expired entries. This reduces unnecessary job ledger writes and worker notifications. Also removes the active-only filter so deactivated revisions are GC-eligible. GC is only concerned with the state of the gc_manifest, not the revision's active status. Add gc::locations_with_expired_entries() query to metadata-db and 3 integration tests verifying pre-filter behavior, empty results, and deactivated revisions. --- tests/src/tests/it_gc.rs | 134 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 3 deletions(-) diff --git a/tests/src/tests/it_gc.rs b/tests/src/tests/it_gc.rs index c655707aa..b3dbc6fab 100644 --- a/tests/src/tests/it_gc.rs +++ b/tests/src/tests/it_gc.rs @@ -1,8 +1,10 @@ -//! Integration tests for GC job execution. +//! Integration tests for GC job execution and scheduling. //! //! These tests require a running PostgreSQL instance (provided by `pgtemp`). -//! They verify the full collection algorithm: expired file streaming, -//! metadata deletion, and physical file deletion. +//! They verify: +//! - The full collection algorithm: expired file streaming, metadata deletion, +//! and physical file deletion. +//! - The GC manifest pre-filter query used by the controller scheduler. use std::time::Duration; @@ -258,3 +260,129 @@ async fn gc_with_nonexistent_location_returns_error() { "error should be LocationNotFound" ); } + +// --- Scheduling pre-filter tests --- +// These test the `gc::locations_with_expired_entries` query that the controller +// scheduler uses to pre-filter which locations need GC. + +#[tokio::test] +async fn locations_with_expired_entries_returns_only_locations_with_expired_files() { + //* Given + let (_db, conn) = setup_test_db().await; + + // Create two revisions + let loc_a = register_table_and_revision(&conn, "test_ns/ds_a/aaaaaaaaaa/table_a").await; + let loc_b = register_table_and_revision(&conn, "test_ns/ds_b/aaaaaaaaaa/table_b").await; + + // Register a file for each + let url_a = Url::parse("file:///tmp/a.parquet").unwrap(); + let url_b = Url::parse("file:///tmp/b.parquet").unwrap(); + register_file(&conn, loc_a, "a.parquet", &url_a).await; + register_file(&conn, loc_b, "b.parquet", &url_b).await; + + // Get file IDs + let file_ids_a: Vec = files::stream_by_location_id_with_details(&conn, loc_a) + .map_ok(|f| f.id) + .try_collect() + .await + .unwrap(); + let file_ids_b: Vec = files::stream_by_location_id_with_details(&conn, loc_b) + .map_ok(|f| f.id) + .try_collect() + .await + .unwrap(); + + // Only schedule loc_a for GC (with 1s expiry), leave loc_b without GC entries + gc::upsert(&conn, loc_a, &file_ids_a, Duration::from_secs(1)) + .await + .unwrap(); + + // Also schedule loc_b but with a long expiry (not yet expired) + gc::upsert(&conn, loc_b, &file_ids_b, Duration::from_secs(3600)) + .await + .unwrap(); + + // Wait for loc_a's entries to expire + tokio::time::sleep(Duration::from_secs(2)).await; + + //* When + let locations = gc::locations_with_expired_entries(&conn).await.unwrap(); + + //* Then — only loc_a should appear (expired), not loc_b (not yet expired) + assert_eq!( + locations.len(), + 1, + "only one location should have expired entries" + ); + assert_eq!(locations[0], loc_a, "the expired location should be loc_a"); +} + +#[tokio::test] +async fn locations_with_expired_entries_returns_empty_when_no_expired_files() { + //* Given + let (_db, conn) = setup_test_db().await; + + let loc = register_table_and_revision(&conn, "test_ns/ds_empty/aaaaaaaaaa/table_empty").await; + let url = Url::parse("file:///tmp/empty.parquet").unwrap(); + register_file(&conn, loc, "empty.parquet", &url).await; + + let file_ids: Vec = files::stream_by_location_id_with_details(&conn, loc) + .map_ok(|f| f.id) + .try_collect() + .await + .unwrap(); + + // Schedule with a long expiry — nothing should be expired + gc::upsert(&conn, loc, &file_ids, Duration::from_secs(3600)) + .await + .unwrap(); + + //* When + let locations = gc::locations_with_expired_entries(&conn).await.unwrap(); + + //* Then + assert!( + locations.is_empty(), + "no locations should have expired entries" + ); +} + +#[tokio::test] +async fn locations_with_expired_entries_includes_deactivated_revisions() { + //* Given + let (_db, conn) = setup_test_db().await; + + // Register a revision (inactive by default — never called mark_active_by_id) + let loc = + register_table_and_revision(&conn, "test_ns/ds_inactive/aaaaaaaaaa/table_inactive").await; + + let url = Url::parse("file:///tmp/inactive.parquet").unwrap(); + register_file(&conn, loc, "inactive.parquet", &url).await; + + let file_ids: Vec = files::stream_by_location_id_with_details(&conn, loc) + .map_ok(|f| f.id) + .try_collect() + .await + .unwrap(); + + // Schedule for GC with 1s expiry + gc::upsert(&conn, loc, &file_ids, Duration::from_secs(1)) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_secs(2)).await; + + //* When + let locations = gc::locations_with_expired_entries(&conn).await.unwrap(); + + //* Then — deactivated revision should still appear + assert_eq!( + locations.len(), + 1, + "deactivated revision should have expired entries" + ); + assert_eq!( + locations[0], loc, + "the expired location should be the deactivated one" + ); +} From f375e7a02d37ad8f9ee227ee629575ee81faeb86 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Fri, 20 Mar 2026 11:20:17 -0500 Subject: [PATCH 08/11] refactor(tests): use MetadataDb fixture and GcTestCtx wrapper in GC tests Replace raw pgtemp usage with the testlib MetadataDbFixture for consistency with other integration tests. Extract shared setup into a GcTestCtx wrapper struct with helper methods (register_revision, register_file, file_ids, gc_context, data_store). Remove pgtemp direct dependency from tests crate. --- tests/Cargo.toml | 1 - tests/src/tests/it_gc.rs | 392 ++++++++++++++++++--------------------- 2 files changed, 184 insertions(+), 209 deletions(-) diff --git a/tests/Cargo.toml b/tests/Cargo.toml index f64f43851..67ba10350 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -52,7 +52,6 @@ monitoring = { path = "../crates/core/monitoring" } object_store.workspace = true opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true, features = ["testing"] } -pgtemp.workspace = true pretty_assertions.workspace = true rand.workspace = true reqwest.workspace = true diff --git a/tests/src/tests/it_gc.rs b/tests/src/tests/it_gc.rs index b3dbc6fab..d625ebc62 100644 --- a/tests/src/tests/it_gc.rs +++ b/tests/src/tests/it_gc.rs @@ -1,7 +1,6 @@ //! Integration tests for GC job execution and scheduling. //! -//! These tests require a running PostgreSQL instance (provided by `pgtemp`). -//! They verify: +//! These tests verify: //! - The full collection algorithm: expired file streaming, metadata deletion, //! and physical file deletion. //! - The GC manifest pre-filter query used by the controller scheduler. @@ -17,171 +16,194 @@ use metadata_db::{ gc, physical_table, physical_table_revision::{self, LocationId, TablePath}, }; -use pgtemp::PgTempDB; +use tempfile::TempDir; use url::Url; -fn create_temp_dir(name: &str) -> std::path::PathBuf { - let dir = std::env::temp_dir().join(format!("amp_gc_test_{name}_{}", std::process::id())); - let _ = std::fs::remove_dir_all(&dir); - std::fs::create_dir_all(&dir).expect("Failed to create temp dir"); - dir -} - -const TEST_WORKER_ID: &str = "test-worker"; - -async fn setup_test_db() -> (PgTempDB, metadata_db::MetadataDb) { - let temp_db = PgTempDB::new(); - let conn = metadata_db::connect_pool_with_retry(&temp_db.connection_uri(), 5) - .await - .expect("Failed to connect to metadata db"); - - // Register a worker (required for foreign key constraints) - let worker_id = metadata_db::workers::WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); - let raw: Box = - serde_json::from_str("{}").expect("empty JSON should be valid"); - let worker_info = metadata_db::workers::WorkerInfo::from_owned_unchecked(raw); - metadata_db::workers::register(&conn, worker_id, worker_info) - .await - .expect("Failed to register worker"); +use crate::testlib::fixtures::MetadataDb as MetadataDbFixture; - (temp_db, conn) +/// Test context for GC tests. +/// +/// Wraps the shared test fixtures (metadata DB, test directory) and provides +/// convenience helpers for setting up test data. +struct GcTestCtx { + conn: metadata_db::MetadataDb, + test_dir: TempDir, + _metadata_db: MetadataDbFixture, } -async fn register_table_and_revision(conn: &metadata_db::MetadataDb, path: &str) -> LocationId { - let namespace = DatasetNamespace::from_ref_unchecked("test_ns"); - let name = DatasetName::from_ref_unchecked("test_dataset"); - let hash = metadata_db::manifests::ManifestHash::from_ref_unchecked( - "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - ); - let table_name = physical_table::TableName::from_ref_unchecked("test_table"); +impl GcTestCtx { + /// Create a new GC test context with an isolated Postgres instance and temp directory. + async fn setup(test_name: &str) -> Self { + monitoring::logging::init(); - physical_table::register(conn, &namespace, &name, &hash, &table_name) - .await - .expect("Failed to register physical table"); - - let metadata_json = serde_json::json!({ - "dataset_namespace": "test_ns", - "dataset_name": "test_dataset", - "manifest_hash": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - "table_name": "test_table", - }); - let raw = - serde_json::value::to_raw_value(&metadata_json).expect("test metadata should serialize"); - let metadata = physical_table_revision::RevisionMetadata::from_owned_unchecked(raw); - let table_path = TablePath::from_ref_unchecked(path); - - physical_table_revision::register(conn, table_path, metadata) + let metadata_db = MetadataDbFixture::new().await; + let conn = metadata_db.conn_pool().clone(); + + // Register a worker (required for foreign key constraints on job tables) + let worker_id = metadata_db::workers::WorkerNodeId::from_ref_unchecked("gc-test-worker"); + let raw: Box = + serde_json::from_str("{}").expect("empty JSON should be valid"); + let worker_info = metadata_db::workers::WorkerInfo::from_owned_unchecked(raw); + metadata_db::workers::register(&conn, worker_id, worker_info) + .await + .expect("failed to register worker"); + + let test_dir = tempfile::Builder::new() + .prefix(&format!("gc_test__{test_name}__")) + .tempdir() + .expect("failed to create test dir"); + + Self { + conn, + test_dir, + _metadata_db: metadata_db, + } + } + + /// Create a DataStore backed by the test directory's filesystem. + fn data_store(&self) -> DataStore { + let data_url = amp_object_store::url::ObjectStoreUrl::new(format!( + "file://{}/", + self.test_dir.path().display() + )) + .expect("failed to create object store URL"); + + DataStore::new(self.conn.clone(), data_url, 0).expect("failed to create data store") + } + + /// Register a physical table and revision, returning its location ID. + async fn register_revision(&self, path: &str) -> LocationId { + let namespace = DatasetNamespace::from_ref_unchecked("test_ns"); + let name = DatasetName::from_ref_unchecked("test_dataset"); + let hash = metadata_db::manifests::ManifestHash::from_ref_unchecked( + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + ); + let table_name = physical_table::TableName::from_ref_unchecked("test_table"); + + physical_table::register(&self.conn, &namespace, &name, &hash, &table_name) + .await + .expect("failed to register physical table"); + + let metadata_json = serde_json::json!({ + "dataset_namespace": "test_ns", + "dataset_name": "test_dataset", + "manifest_hash": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "table_name": "test_table", + }); + let raw = serde_json::value::to_raw_value(&metadata_json) + .expect("test metadata should serialize"); + let metadata = physical_table_revision::RevisionMetadata::from_owned_unchecked(raw); + let table_path = TablePath::from_ref_unchecked(path); + + physical_table_revision::register(&self.conn, table_path, metadata) + .await + .expect("failed to register revision") + } + + /// Register a file in the metadata database for a given location. + async fn register_file(&self, location_id: LocationId, file_name: &str, url: &Url) { + files::register( + &self.conn, + location_id, + url, + FileName::from_ref_unchecked(file_name), + 100, + None, + None, + serde_json::json!({}), + &vec![0u8; 10], + ) .await - .expect("Failed to register revision") -} + .expect("failed to register file"); + } -async fn register_file( - conn: &metadata_db::MetadataDb, - location_id: LocationId, - file_name: &str, - url: &Url, -) { - files::register( - conn, - location_id, - url, - FileName::from_ref_unchecked(file_name), - 100, // object_size - None, - None, - serde_json::json!({}), // parquet_meta - &vec![0u8; 10], // footer - ) - .await - .expect("Failed to register file"); + /// Get all file IDs for a given location. + async fn file_ids(&self, location_id: LocationId) -> Vec { + files::stream_by_location_id_with_details(&self.conn, location_id) + .map_ok(|f| f.id) + .try_collect() + .await + .expect("failed to stream files") + } + + /// Build a GC execution context. + fn gc_context(&self) -> Context { + Context { + metadata_db: self.conn.clone(), + data_store: self.data_store(), + meter: None, + } + } } +// --- GC execution tests --- + #[tokio::test] async fn gc_deletes_expired_files() { //* Given - let (_db, conn) = setup_test_db().await; - - // Create a temp directory for the object store - let temp_dir = create_temp_dir("deletes_expired"); - let data_url = - amp_object_store::url::ObjectStoreUrl::new(format!("file://{}/", temp_dir.display())) - .expect("Failed to create object store URL"); + let ctx = GcTestCtx::setup("gc_deletes_expired").await; - let data_store = - DataStore::new(conn.clone(), data_url, 0).expect("Failed to create data store"); - - // Register a physical table and revision let revision_path = "test_ns/test_dataset/aaaaaaaaaa/test_table"; - let location_id = register_table_and_revision(&conn, revision_path).await; + let location_id = ctx.register_revision(revision_path).await; // Create physical files on disk - let revision_dir = temp_dir.join(revision_path); - std::fs::create_dir_all(&revision_dir).expect("Failed to create revision dir"); + let revision_dir = ctx.test_dir.path().join(revision_path); + std::fs::create_dir_all(&revision_dir).expect("failed to create revision dir"); std::fs::write(revision_dir.join("file_a.parquet"), b"test data a") - .expect("Failed to write file_a"); + .expect("failed to write file_a"); std::fs::write(revision_dir.join("file_b.parquet"), b"test data b") - .expect("Failed to write file_b"); + .expect("failed to write file_b"); // Register file metadata in Postgres - let base_url = Url::parse(&format!("file://{}/{}/", temp_dir.display(), revision_path,)) - .expect("Failed to parse URL"); - - let url_a = base_url.join("file_a.parquet").unwrap(); - let url_b = base_url.join("file_b.parquet").unwrap(); - register_file(&conn, location_id, "file_a.parquet", &url_a).await; - register_file(&conn, location_id, "file_b.parquet", &url_b).await; + let base_url = Url::parse(&format!( + "file://{}/{}/", + ctx.test_dir.path().display(), + revision_path, + )) + .expect("failed to parse URL"); + + ctx.register_file( + location_id, + "file_a.parquet", + &base_url.join("file_a.parquet").unwrap(), + ) + .await; + ctx.register_file( + location_id, + "file_b.parquet", + &base_url.join("file_b.parquet").unwrap(), + ) + .await; - // Get file IDs from the database - let file_ids: Vec = - files::stream_by_location_id_with_details(&conn, location_id) - .map_ok(|f| f.id) - .try_collect() - .await - .expect("Failed to stream files"); + let file_ids = ctx.file_ids(location_id).await; assert_eq!(file_ids.len(), 2, "should have 2 registered files"); - // Schedule files for GC with 1s duration (minimum allowed by DB constraint). - // Then wait for expiration before running GC. - gc::upsert(&conn, location_id, &file_ids, Duration::from_secs(1)) + // Schedule files for GC with 1s duration (minimum allowed by DB constraint) + gc::upsert(&ctx.conn, location_id, &file_ids, Duration::from_secs(1)) .await - .expect("Failed to upsert GC manifest"); + .expect("failed to upsert GC manifest"); // Wait for the files to expire tokio::time::sleep(Duration::from_secs(2)).await; //* When - let ctx = Context { - metadata_db: conn.clone(), - data_store, - meter: None, - }; - let desc = JobDescriptor { location_id }; - - amp_worker_gc::job_impl::execute(ctx, desc) + amp_worker_gc::job_impl::execute(ctx.gc_context(), JobDescriptor { location_id }) .await .expect("GC execution failed"); //* Then - // Verify metadata is gone - let remaining_files: Vec = - files::stream_by_location_id_with_details(&conn, location_id) - .map_ok(|f| f.id) - .try_collect() - .await - .expect("Failed to stream files"); + let remaining_files = ctx.file_ids(location_id).await; assert!( remaining_files.is_empty(), "all file metadata should be deleted" ); - // Verify GC manifest is empty (cascaded from file_metadata deletion) - let expired: Vec = gc::stream_expired(&conn, location_id) + let expired: Vec = gc::stream_expired(&ctx.conn, location_id) .try_collect() .await - .expect("Failed to stream expired"); + .expect("failed to stream expired"); assert!(expired.is_empty(), "GC manifest should be empty"); - // Verify physical files are gone assert!( !revision_dir.join("file_a.parquet").exists(), "file_a.parquet should be deleted from disk" @@ -195,30 +217,13 @@ async fn gc_deletes_expired_files() { #[tokio::test] async fn gc_with_no_expired_files_is_a_noop() { //* Given - let (_db, conn) = setup_test_db().await; - - let temp_dir = create_temp_dir("noop"); - let data_url = - amp_object_store::url::ObjectStoreUrl::new(format!("file://{}/", temp_dir.display())) - .expect("Failed to create object store URL"); - - let data_store = - DataStore::new(conn.clone(), data_url, 0).expect("Failed to create data store"); + let ctx = GcTestCtx::setup("gc_noop").await; let revision_path = "test_ns/test_dataset/aaaaaaaaaa/test_table"; - let location_id = register_table_and_revision(&conn, revision_path).await; - - // No files registered, no GC manifest entries + let location_id = ctx.register_revision(revision_path).await; //* When - let ctx = Context { - metadata_db: conn.clone(), - data_store, - meter: None, - }; - let desc = JobDescriptor { location_id }; - - amp_worker_gc::job_impl::execute(ctx, desc) + amp_worker_gc::job_impl::execute(ctx.gc_context(), JobDescriptor { location_id }) .await .expect("GC execution should succeed with no expired files"); @@ -228,27 +233,14 @@ async fn gc_with_no_expired_files_is_a_noop() { #[tokio::test] async fn gc_with_nonexistent_location_returns_error() { //* Given - let (_db, conn) = setup_test_db().await; - - let temp_dir = create_temp_dir("nonexistent"); - let data_url = - amp_object_store::url::ObjectStoreUrl::new(format!("file://{}/", temp_dir.display())) - .expect("Failed to create object store URL"); - - let data_store = - DataStore::new(conn.clone(), data_url, 0).expect("Failed to create data store"); + let ctx = GcTestCtx::setup("gc_nonexistent").await; - //* When - let ctx = Context { - metadata_db: conn.clone(), - data_store, - meter: None, - }; let desc = JobDescriptor { location_id: LocationId::try_from(999999i64).unwrap(), }; - let result = amp_worker_gc::job_impl::execute(ctx, desc).await; + //* When + let result = amp_worker_gc::job_impl::execute(ctx.gc_context(), desc).await; //* Then assert!(result.is_err(), "should fail with LocationNotFound"); @@ -268,45 +260,35 @@ async fn gc_with_nonexistent_location_returns_error() { #[tokio::test] async fn locations_with_expired_entries_returns_only_locations_with_expired_files() { //* Given - let (_db, conn) = setup_test_db().await; + let ctx = GcTestCtx::setup("gc_prefilter_expired_only").await; - // Create two revisions - let loc_a = register_table_and_revision(&conn, "test_ns/ds_a/aaaaaaaaaa/table_a").await; - let loc_b = register_table_and_revision(&conn, "test_ns/ds_b/aaaaaaaaaa/table_b").await; + let loc_a = ctx + .register_revision("test_ns/ds_a/aaaaaaaaaa/table_a") + .await; + let loc_b = ctx + .register_revision("test_ns/ds_b/aaaaaaaaaa/table_b") + .await; - // Register a file for each let url_a = Url::parse("file:///tmp/a.parquet").unwrap(); let url_b = Url::parse("file:///tmp/b.parquet").unwrap(); - register_file(&conn, loc_a, "a.parquet", &url_a).await; - register_file(&conn, loc_b, "b.parquet", &url_b).await; + ctx.register_file(loc_a, "a.parquet", &url_a).await; + ctx.register_file(loc_b, "b.parquet", &url_b).await; - // Get file IDs - let file_ids_a: Vec = files::stream_by_location_id_with_details(&conn, loc_a) - .map_ok(|f| f.id) - .try_collect() - .await - .unwrap(); - let file_ids_b: Vec = files::stream_by_location_id_with_details(&conn, loc_b) - .map_ok(|f| f.id) - .try_collect() - .await - .unwrap(); + let file_ids_a = ctx.file_ids(loc_a).await; + let file_ids_b = ctx.file_ids(loc_b).await; - // Only schedule loc_a for GC (with 1s expiry), leave loc_b without GC entries - gc::upsert(&conn, loc_a, &file_ids_a, Duration::from_secs(1)) + // loc_a: 1s expiry (will expire), loc_b: 1h expiry (won't expire) + gc::upsert(&ctx.conn, loc_a, &file_ids_a, Duration::from_secs(1)) .await .unwrap(); - - // Also schedule loc_b but with a long expiry (not yet expired) - gc::upsert(&conn, loc_b, &file_ids_b, Duration::from_secs(3600)) + gc::upsert(&ctx.conn, loc_b, &file_ids_b, Duration::from_secs(3600)) .await .unwrap(); - // Wait for loc_a's entries to expire tokio::time::sleep(Duration::from_secs(2)).await; //* When - let locations = gc::locations_with_expired_entries(&conn).await.unwrap(); + let locations = gc::locations_with_expired_entries(&ctx.conn).await.unwrap(); //* Then — only loc_a should appear (expired), not loc_b (not yet expired) assert_eq!( @@ -320,25 +302,23 @@ async fn locations_with_expired_entries_returns_only_locations_with_expired_file #[tokio::test] async fn locations_with_expired_entries_returns_empty_when_no_expired_files() { //* Given - let (_db, conn) = setup_test_db().await; + let ctx = GcTestCtx::setup("gc_prefilter_empty").await; + + let loc = ctx + .register_revision("test_ns/ds_empty/aaaaaaaaaa/table_empty") + .await; - let loc = register_table_and_revision(&conn, "test_ns/ds_empty/aaaaaaaaaa/table_empty").await; let url = Url::parse("file:///tmp/empty.parquet").unwrap(); - register_file(&conn, loc, "empty.parquet", &url).await; + ctx.register_file(loc, "empty.parquet", &url).await; - let file_ids: Vec = files::stream_by_location_id_with_details(&conn, loc) - .map_ok(|f| f.id) - .try_collect() - .await - .unwrap(); + let file_ids = ctx.file_ids(loc).await; - // Schedule with a long expiry — nothing should be expired - gc::upsert(&conn, loc, &file_ids, Duration::from_secs(3600)) + gc::upsert(&ctx.conn, loc, &file_ids, Duration::from_secs(3600)) .await .unwrap(); //* When - let locations = gc::locations_with_expired_entries(&conn).await.unwrap(); + let locations = gc::locations_with_expired_entries(&ctx.conn).await.unwrap(); //* Then assert!( @@ -350,30 +330,26 @@ async fn locations_with_expired_entries_returns_empty_when_no_expired_files() { #[tokio::test] async fn locations_with_expired_entries_includes_deactivated_revisions() { //* Given - let (_db, conn) = setup_test_db().await; + let ctx = GcTestCtx::setup("gc_prefilter_deactivated").await; // Register a revision (inactive by default — never called mark_active_by_id) - let loc = - register_table_and_revision(&conn, "test_ns/ds_inactive/aaaaaaaaaa/table_inactive").await; + let loc = ctx + .register_revision("test_ns/ds_inactive/aaaaaaaaaa/table_inactive") + .await; let url = Url::parse("file:///tmp/inactive.parquet").unwrap(); - register_file(&conn, loc, "inactive.parquet", &url).await; + ctx.register_file(loc, "inactive.parquet", &url).await; - let file_ids: Vec = files::stream_by_location_id_with_details(&conn, loc) - .map_ok(|f| f.id) - .try_collect() - .await - .unwrap(); + let file_ids = ctx.file_ids(loc).await; - // Schedule for GC with 1s expiry - gc::upsert(&conn, loc, &file_ids, Duration::from_secs(1)) + gc::upsert(&ctx.conn, loc, &file_ids, Duration::from_secs(1)) .await .unwrap(); tokio::time::sleep(Duration::from_secs(2)).await; //* When - let locations = gc::locations_with_expired_entries(&conn).await.unwrap(); + let locations = gc::locations_with_expired_entries(&ctx.conn).await.unwrap(); //* Then — deactivated revision should still appear assert_eq!( From 516f0acf8be8a3eadd87d9f46d7587b0dc7191c9 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Fri, 20 Mar 2026 12:42:07 -0500 Subject: [PATCH 09/11] chore: update Cargo.lock after removing pgtemp from tests --- Cargo.lock | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 3e2992bea..1bc547c2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12453,7 +12453,6 @@ dependencies = [ "object_store", "opentelemetry", "opentelemetry_sdk", - "pgtemp", "pretty_assertions", "rand 0.9.2", "reqwest 0.13.2", From cf110ba54a7b5a2bdfab53d8d9666992b8555130 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Fri, 20 Mar 2026 14:22:31 -0500 Subject: [PATCH 10/11] test(tests): add scheduler dedup and throttle tests for GC jobs Add two integration tests covering the scheduler's in-flight skip and recently-completed cooldown logic in schedule_gc_jobs(), plus helper methods on GcTestCtx to reduce boilerplate. --- tests/src/tests/it_gc.rs | 196 +++++++++++++++++++++++++++++++-------- 1 file changed, 155 insertions(+), 41 deletions(-) diff --git a/tests/src/tests/it_gc.rs b/tests/src/tests/it_gc.rs index d625ebc62..71585007e 100644 --- a/tests/src/tests/it_gc.rs +++ b/tests/src/tests/it_gc.rs @@ -8,6 +8,7 @@ use std::time::Duration; use amp_data_store::DataStore; +use amp_worker_core::jobs::job_id::JobId; use amp_worker_gc::{job_ctx::Context, job_descriptor::JobDescriptor}; use futures::TryStreamExt; use metadata_db::{ @@ -134,6 +135,36 @@ impl GcTestCtx { meter: None, } } + + /// Set up a location with an expired GC entry and return its location ID and idempotency key. + /// + /// Registers a revision, a file, upserts a 1s GC entry, and waits for expiry. + async fn setup_expired_location( + &self, + path: &str, + file_name: &str, + ) -> (LocationId, metadata_db::jobs::IdempotencyKey<'static>) { + let loc = self.register_revision(path).await; + let url = Url::parse(&format!("file:///tmp/{file_name}")).unwrap(); + self.register_file(loc, file_name, &url).await; + let file_ids = self.file_ids(loc).await; + gc::upsert(&self.conn, loc, &file_ids, Duration::from_secs(1)) + .await + .unwrap(); + tokio::time::sleep(Duration::from_secs(2)).await; + let key = amp_worker_gc::job_key::idempotency_key(loc); + (loc, key) + } + + /// Transition a job from Scheduled → Running → Completed. + async fn complete_job(&self, job_id: JobId) { + metadata_db::job_status::mark_running(&self.conn, job_id) + .await + .expect("failed to mark running"); + metadata_db::job_status::mark_completed(&self.conn, job_id) + .await + .expect("failed to mark completed"); + } } // --- GC execution tests --- @@ -253,14 +284,14 @@ async fn gc_with_nonexistent_location_returns_error() { ); } -// --- Scheduling pre-filter tests --- -// These test the `gc::locations_with_expired_entries` query that the controller -// scheduler uses to pre-filter which locations need GC. +// --- Scheduler tests --- +// These test the controller's schedule_gc_jobs() method to verify it only +// schedules GC jobs for locations that actually have expired entries. #[tokio::test] -async fn locations_with_expired_entries_returns_only_locations_with_expired_files() { +async fn scheduler_only_schedules_gc_for_locations_with_expired_entries() { //* Given - let ctx = GcTestCtx::setup("gc_prefilter_expired_only").await; + let ctx = GcTestCtx::setup("gc_scheduler_prefilter").await; let loc_a = ctx .register_revision("test_ns/ds_a/aaaaaaaaaa/table_a") @@ -287,78 +318,161 @@ async fn locations_with_expired_entries_returns_only_locations_with_expired_file tokio::time::sleep(Duration::from_secs(2)).await; + let scheduler = controller::scheduler::Scheduler::new(ctx.conn.clone(), None); + //* When - let locations = gc::locations_with_expired_entries(&ctx.conn).await.unwrap(); + scheduler + .schedule_gc_jobs(Duration::from_secs(0)) + .await + .expect("schedule_gc_jobs should succeed"); - //* Then — only loc_a should appear (expired), not loc_b (not yet expired) - assert_eq!( - locations.len(), - 1, - "only one location should have expired entries" + //* Then — only loc_a should have a job scheduled, not loc_b + let key_a = amp_worker_gc::job_key::idempotency_key(loc_a); + let key_b = amp_worker_gc::job_key::idempotency_key(loc_b); + + let job_a = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key_a) + .await + .expect("failed to query job for loc_a"); + let job_b = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key_b) + .await + .expect("failed to query job for loc_b"); + + assert!( + job_a.is_some(), + "scheduler should have created a job for loc_a (expired entries)" + ); + assert!( + job_b.is_none(), + "scheduler should NOT have created a job for loc_b (no expired entries)" ); - assert_eq!(locations[0], loc_a, "the expired location should be loc_a"); } #[tokio::test] -async fn locations_with_expired_entries_returns_empty_when_no_expired_files() { +async fn scheduler_schedules_gc_for_deactivated_revisions() { //* Given - let ctx = GcTestCtx::setup("gc_prefilter_empty").await; + let ctx = GcTestCtx::setup("gc_scheduler_deactivated").await; + // Register a revision (inactive by default — never called mark_active_by_id) let loc = ctx - .register_revision("test_ns/ds_empty/aaaaaaaaaa/table_empty") + .register_revision("test_ns/ds_inactive/aaaaaaaaaa/table_inactive") .await; - let url = Url::parse("file:///tmp/empty.parquet").unwrap(); - ctx.register_file(loc, "empty.parquet", &url).await; + let url = Url::parse("file:///tmp/inactive.parquet").unwrap(); + ctx.register_file(loc, "inactive.parquet", &url).await; let file_ids = ctx.file_ids(loc).await; - gc::upsert(&ctx.conn, loc, &file_ids, Duration::from_secs(3600)) + gc::upsert(&ctx.conn, loc, &file_ids, Duration::from_secs(1)) .await .unwrap(); + tokio::time::sleep(Duration::from_secs(2)).await; + + let scheduler = controller::scheduler::Scheduler::new(ctx.conn.clone(), None); + //* When - let locations = gc::locations_with_expired_entries(&ctx.conn).await.unwrap(); + scheduler + .schedule_gc_jobs(Duration::from_secs(0)) + .await + .expect("schedule_gc_jobs should succeed"); + + //* Then — deactivated revision should still get a GC job + let key = amp_worker_gc::job_key::idempotency_key(loc); + let job = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key) + .await + .expect("failed to query job"); - //* Then assert!( - locations.is_empty(), - "no locations should have expired entries" + job.is_some(), + "scheduler should schedule GC for deactivated revisions" ); } #[tokio::test] -async fn locations_with_expired_entries_includes_deactivated_revisions() { +async fn scheduler_skips_in_flight_gc_jobs() { //* Given - let ctx = GcTestCtx::setup("gc_prefilter_deactivated").await; + let ctx = GcTestCtx::setup("gc_scheduler_in_flight").await; - // Register a revision (inactive by default — never called mark_active_by_id) - let loc = ctx - .register_revision("test_ns/ds_inactive/aaaaaaaaaa/table_inactive") + let (_loc, key) = ctx + .setup_expired_location( + "test_ns/ds_inflight/aaaaaaaaaa/table_inflight", + "inflight.parquet", + ) .await; - let url = Url::parse("file:///tmp/inactive.parquet").unwrap(); - ctx.register_file(loc, "inactive.parquet", &url).await; + let scheduler = controller::scheduler::Scheduler::new(ctx.conn.clone(), None); - let file_ids = ctx.file_ids(loc).await; + // First call: creates a Scheduled job + scheduler + .schedule_gc_jobs(Duration::from_secs(0)) + .await + .expect("first schedule_gc_jobs should succeed"); - gc::upsert(&ctx.conn, loc, &file_ids, Duration::from_secs(1)) + let first_job = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key) .await - .unwrap(); + .expect("failed to query job") + .expect("job should exist after first scheduling"); - tokio::time::sleep(Duration::from_secs(2)).await; + //* When — call again while the job is still in-flight (Scheduled) + scheduler + .schedule_gc_jobs(Duration::from_secs(0)) + .await + .expect("second schedule_gc_jobs should succeed"); - //* When - let locations = gc::locations_with_expired_entries(&ctx.conn).await.unwrap(); + //* Then — the job should be unchanged (same ID, still Scheduled) + let second_job = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key) + .await + .expect("failed to query job") + .expect("job should still exist"); - //* Then — deactivated revision should still appear assert_eq!( - locations.len(), - 1, - "deactivated revision should have expired entries" + first_job.id, second_job.id, + "scheduler should not create a duplicate job while one is in-flight" ); +} + +#[tokio::test] +async fn scheduler_skips_recently_completed_gc_jobs() { + //* Given + let ctx = GcTestCtx::setup("gc_scheduler_too_recent").await; + + let (_loc, key) = ctx + .setup_expired_location( + "test_ns/ds_recent/aaaaaaaaaa/table_recent", + "recent.parquet", + ) + .await; + + let scheduler = controller::scheduler::Scheduler::new(ctx.conn.clone(), None); + + // Schedule and complete a GC job + scheduler + .schedule_gc_jobs(Duration::from_secs(0)) + .await + .expect("initial schedule_gc_jobs should succeed"); + + let job = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key) + .await + .expect("failed to query job") + .expect("job should exist"); + + ctx.complete_job(job.id.into()).await; + + //* When — schedule again with the default gc_interval (60s) + scheduler + .schedule_gc_jobs(Duration::from_secs(60)) + .await + .expect("schedule_gc_jobs should succeed"); + + //* Then — the job should still be in Completed state, not rescheduled + let after_job = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key) + .await + .expect("failed to query job") + .expect("job should still exist"); + assert_eq!( - locations[0], loc, - "the expired location should be the deactivated one" + after_job.status, + amp_worker_core::jobs::status::JobStatus::Completed.into(), + "scheduler should not reschedule a recently completed GC job" ); } From 1edae459c68fc9accc0fab853a84f4e9046eb59a Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Sun, 22 Mar 2026 15:30:37 -0500 Subject: [PATCH 11/11] fix(controller): make scheduler module public for integration tests --- crates/services/controller/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/controller/src/lib.rs b/crates/services/controller/src/lib.rs index a06cb0a94..9c2c99f6d 100644 --- a/crates/services/controller/src/lib.rs +++ b/crates/services/controller/src/lib.rs @@ -2,7 +2,7 @@ //! //! The controller service provides the admin API for managing Amp operations. -mod scheduler; +pub mod scheduler; pub mod service; // Re-export build_info to avoid code duplication