diff --git a/Cargo.lock b/Cargo.lock index 11ef46e25..1bc547c2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -36,6 +36,7 @@ dependencies = [ "amp-worker-core", "amp-worker-datasets-derived", "amp-worker-datasets-raw", + "amp-worker-gc", "async-trait", "axum", "common", @@ -1320,6 +1321,23 @@ dependencies = [ "verification", ] +[[package]] +name = "amp-worker-gc" +version = "0.1.0" +dependencies = [ + "amp-data-store", + "amp-worker-core", + "datasets-common", + "futures", + "metadata-db", + "monitoring", + "object_store", + "serde", + "serde_json", + "thiserror 2.0.18", + "tracing", +] + [[package]] name = "ampcc" version = "0.1.0" @@ -3249,10 +3267,12 @@ name = "controller" version = "0.1.0" dependencies = [ "admin-api", + "amp-config", "amp-data-store", "amp-datasets-registry", "amp-providers-registry", "amp-worker-core", + "amp-worker-gc", "async-trait", "axum", "common", @@ -12406,6 +12426,7 @@ dependencies = [ "amp-providers-registry", "amp-providers-solana", "amp-worker-core", + "amp-worker-gc", "ampctl", "anyhow", "arrow-flight", @@ -14307,6 +14328,7 @@ dependencies = [ "amp-worker-core", "amp-worker-datasets-derived", "amp-worker-datasets-raw", + "amp-worker-gc", "async-trait", "backon", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 6599b53ca..2b9c31ac3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ "crates/core/worker-core", "crates/core/worker-datasets-derived", "crates/core/worker-datasets-raw", + "crates/core/worker-gc", "crates/parquet-ext", "crates/extractors/evm-rpc", "crates/extractors/evm-rpc/gen", diff --git a/crates/bin/ampctl/src/cmd/job/list.rs b/crates/bin/ampctl/src/cmd/job/list.rs index 84972e5fd..d1119788a 100644 --- a/crates/bin/ampctl/src/cmd/job/list.rs +++ b/crates/bin/ampctl/src/cmd/job/list.rs @@ -133,6 +133,7 @@ fn show_dataset_descriptor(descriptor: &serde_json::Value) -> Option { dataset_name = desc.dataset_name, hash = &desc.manifest_hash.as_str()[..7], )), + JobDescriptor::Gc(desc) => Some(format!("gc location:{}", desc.location_id)), } } diff --git a/crates/bin/ampd/src/controller_cmd.rs b/crates/bin/ampd/src/controller_cmd.rs index 8609b47b6..08d645e7b 100644 --- a/crates/bin/ampd/src/controller_cmd.rs +++ b/crates/bin/ampd/src/controller_cmd.rs @@ -72,6 +72,7 @@ pub async fn run( ethcall_udfs_cache, meter, at, + config.gc_scheduling, ) .await .map_err(Error::ServiceInit)?; diff --git a/crates/bin/ampd/src/solo_cmd.rs b/crates/bin/ampd/src/solo_cmd.rs index 9ef623895..ff2d562a4 100644 --- a/crates/bin/ampd/src/solo_cmd.rs +++ b/crates/bin/ampd/src/solo_cmd.rs @@ -136,6 +136,7 @@ pub async fn run( ethcall_udfs_cache.clone(), meter.clone(), config.controller_addrs.admin_api_addr, + config.gc_scheduling.clone(), ) .await .map_err(Error::ServiceInit)?; diff --git a/crates/config/src/config_file.rs b/crates/config/src/config_file.rs index ea3215a40..cd5a63211 100644 --- a/crates/config/src/config_file.rs +++ b/crates/config/src/config_file.rs @@ -201,6 +201,10 @@ pub struct ConfigFile { // Worker event streaming configuration #[serde(default)] pub worker_events: crate::WorkerEventsConfig, + + // Controller GC scheduling configuration + #[serde(default)] + pub gc_scheduling: crate::controller::GcSchedulingConfig, } impl ConfigFile { diff --git a/crates/config/src/controller.rs b/crates/config/src/controller.rs index 635d85697..63101b439 100644 --- a/crates/config/src/controller.rs +++ b/crates/config/src/controller.rs @@ -1,6 +1,9 @@ -use std::net::{AddrParseError, SocketAddr}; +use std::{ + net::{AddrParseError, SocketAddr}, + time::Duration, +}; -use crate::config_file::ConfigFile; +use crate::{config_file::ConfigFile, worker_core::ConfigDuration}; /// Default port for the Admin API server. pub const DEFAULT_CONTROLLER_ADMIN_API_PORT: u16 = 1610; @@ -12,6 +15,35 @@ pub struct ControllerAddrs { pub admin_api_addr: SocketAddr, } +/// Configuration for controller-managed garbage collection scheduling. +/// +/// Controls whether the controller schedules GC jobs and at what interval. +/// GC jobs are executed by workers, not the controller. +#[derive(Debug, Clone, Default, serde::Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub struct GcSchedulingConfig { + /// Enable GC job scheduling (default: `false`). + /// + /// When `false`, the controller will not schedule any GC jobs. + /// Enable only after verifying the legacy worker-internal GC is disabled. + #[serde(default)] + pub enabled: bool, + + /// Interval in seconds between GC scheduling sweeps (default: 60). + /// + /// The controller scans for active physical table revisions at this + /// interval and schedules GC jobs for each one. + #[serde(default)] + pub interval: ConfigDuration<60>, +} + +impl GcSchedulingConfig { + /// Returns the scheduling interval as a [`Duration`]. + pub fn interval_duration(&self) -> Duration { + self.interval.clone().into() + } +} + impl Default for ControllerAddrs { fn default() -> Self { Self { diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index f6d12f0c0..f15fd9f0e 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -20,6 +20,7 @@ pub use self::{ DEFAULT_MANIFESTS_DIRNAME, DEFAULT_MICROBATCH_MAX_INTERVAL, DEFAULT_PROVIDERS_DIRNAME, DEFAULT_SERVER_MICROBATCH_MAX_INTERVAL, no_defaults_override, }, + controller::GcSchedulingConfig, metadb::{DEFAULT_METADB_CONN_POOL_SIZE, DEFAULT_METADB_DIRNAME, MetadataDbConfig}, redacted::Redacted, }; @@ -112,6 +113,7 @@ fn resolve_config( poll_interval: config_file.poll_interval_secs.into(), keep_alive_interval: config_file.keep_alive_interval, worker_events, + gc_scheduling: config_file.gc_scheduling, }) } @@ -230,6 +232,8 @@ pub struct Config { pub keep_alive_interval: u64, /// Worker event streaming configuration. pub worker_events: WorkerEventsConfig, + /// GC scheduling configuration for the controller. + pub gc_scheduling: GcSchedulingConfig, } /// Configuration for worker event streaming. diff --git a/crates/core/metadata-db/src/gc.rs b/crates/core/metadata-db/src/gc.rs index 3f3baa5ae..f24242a95 100644 --- a/crates/core/metadata-db/src/gc.rs +++ b/crates/core/metadata-db/src/gc.rs @@ -30,6 +30,17 @@ where .map_err(Error::Database) } +/// Returns distinct location IDs that have at least one expired entry in the GC manifest. +#[tracing::instrument(skip(exe), err)] +pub async fn locations_with_expired_entries<'c, E>(exe: E) -> Result, Error> +where + E: Executor<'c>, +{ + sql::locations_with_expired_entries(exe) + .await + .map_err(Error::Database) +} + /// Streams files that have passed their expiration time and are ready for deletion. #[tracing::instrument(skip(exe))] pub fn stream_expired<'c, E>( diff --git a/crates/core/metadata-db/src/gc/sql.rs b/crates/core/metadata-db/src/gc/sql.rs index 51a101755..25050b5f6 100644 --- a/crates/core/metadata-db/src/gc/sql.rs +++ b/crates/core/metadata-db/src/gc/sql.rs @@ -82,6 +82,21 @@ where Box::pin(sqlx::query_as(query).bind(location_id).fetch(exe)) } +/// Returns distinct location IDs that have at least one expired entry in the GC manifest. +pub async fn locations_with_expired_entries<'c, E>(exe: E) -> Result, sqlx::Error> +where + E: Executor<'c, Database = Postgres>, +{ + let query = indoc::indoc! {r#" + SELECT DISTINCT location_id + FROM gc_manifest + WHERE expiration < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + ORDER BY location_id; + "#}; + + sqlx::query_scalar(query).fetch_all(exe).await +} + /// GC manifest row from the database #[derive(Debug, sqlx::FromRow)] pub struct GcManifestRow { diff --git a/crates/core/monitoring/src/logging.rs b/crates/core/monitoring/src/logging.rs index 35432863a..cd684e922 100644 --- a/crates/core/monitoring/src/logging.rs +++ b/crates/core/monitoring/src/logging.rs @@ -81,6 +81,7 @@ const AMP_CRATES: &[&str] = &[ "amp_worker_core", "amp_worker_datasets_derived", "amp_worker_datasets_raw", + "amp_worker_gc", "ampcc", "ampctl", "ampd", diff --git a/crates/core/worker-gc/Cargo.toml b/crates/core/worker-gc/Cargo.toml new file mode 100644 index 000000000..3b81d5673 --- /dev/null +++ b/crates/core/worker-gc/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "amp-worker-gc" +edition.workspace = true +version.workspace = true +license-file.workspace = true + +[dependencies] +amp-data-store = { path = "../data-store" } +amp-worker-core = { path = "../worker-core" } +datasets-common = { path = "../datasets-common" } +futures.workspace = true +metadata-db = { path = "../metadata-db" } +monitoring = { path = "../monitoring" } +object_store.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tracing.workspace = true diff --git a/crates/core/worker-gc/src/error.rs b/crates/core/worker-gc/src/error.rs new file mode 100644 index 000000000..efb55cd31 --- /dev/null +++ b/crates/core/worker-gc/src/error.rs @@ -0,0 +1,77 @@ +//! Error types for GC job execution. + +use amp_worker_core::{error_detail::ErrorDetailsProvider, retryable::RetryableErrorExt}; + +/// Errors that can occur during garbage collection job execution. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// The target physical table revision does not exist. + /// + /// This occurs when the `location_id` in the job descriptor does not match any + /// revision in the metadata database. The revision may have been deleted between + /// scheduling and execution. + /// + /// This is a fatal (non-retryable) error since the location will not reappear. + #[error("location not found: {0}")] + LocationNotFound(metadata_db::physical_table_revision::LocationId), + + /// Failed to query the metadata database for the physical table revision. + /// + /// This occurs when looking up the revision by `location_id` at the start of + /// the GC job. Common causes include database connectivity issues or timeouts. + #[error("metadata database error")] + MetadataDb(#[source] metadata_db::Error), + + /// Failed to stream expired files from the GC manifest. + /// + /// This occurs during step 1 of the collection algorithm when querying + /// `gc_manifest` for files whose expiration has passed. Common causes include + /// database connectivity issues or query timeouts. + #[error("failed to stream expired files")] + FileStream(#[source] metadata_db::Error), + + /// Failed to delete file metadata records from Postgres. + /// + /// This occurs during step 2 of the collection algorithm when removing + /// `file_metadata` rows for expired files. Common causes include database + /// connectivity issues or transaction conflicts. + #[error("failed to delete file metadata")] + FileMetadataDelete(#[source] metadata_db::Error), + + /// Failed to delete a physical file from object storage. + /// + /// This occurs during step 3 of the collection algorithm when removing + /// Parquet files from S3/GCS/local storage. Common causes include network + /// failures, permission errors, or storage service unavailability. + /// + /// Note: `NotFound` errors are tolerated (the file is already gone). + /// Only other object store errors trigger this variant. + #[error("object store error")] + ObjectStore(#[source] object_store::Error), +} + +impl RetryableErrorExt for Error { + fn is_retryable(&self) -> bool { + match self { + Self::LocationNotFound(_) => false, + Self::MetadataDb(_) => true, + Self::FileStream(_) => true, + Self::FileMetadataDelete(_) => true, + Self::ObjectStore(_) => true, + } + } +} + +impl amp_worker_core::retryable::JobErrorExt for Error { + fn error_code(&self) -> &'static str { + match self { + Self::LocationNotFound(_) => "GC_LOCATION_NOT_FOUND", + Self::MetadataDb(_) => "GC_METADATA_DB", + Self::FileStream(_) => "GC_FILE_STREAM", + Self::FileMetadataDelete(_) => "GC_FILE_METADATA_DELETE", + Self::ObjectStore(_) => "GC_OBJECT_STORE", + } + } +} + +impl ErrorDetailsProvider for Error {} diff --git a/crates/core/worker-gc/src/job_ctx.rs b/crates/core/worker-gc/src/job_ctx.rs new file mode 100644 index 000000000..696fcb491 --- /dev/null +++ b/crates/core/worker-gc/src/job_ctx.rs @@ -0,0 +1,15 @@ +//! Context for GC job execution. + +use amp_data_store::DataStore; +use metadata_db::MetadataDb; +use monitoring::telemetry::metrics::Meter; + +/// Dependencies required to execute a GC job. +pub struct Context { + /// Connection to the metadata database (Postgres). + pub metadata_db: MetadataDb, + /// Connection to object storage (S3/GCS/local FS). + pub data_store: DataStore, + /// OpenTelemetry meter for recording GC metrics. + pub meter: Option, +} diff --git a/crates/core/worker-gc/src/job_descriptor.rs b/crates/core/worker-gc/src/job_descriptor.rs new file mode 100644 index 000000000..32b0352a4 --- /dev/null +++ b/crates/core/worker-gc/src/job_descriptor.rs @@ -0,0 +1,56 @@ +use metadata_db::physical_table_revision::LocationId; + +use crate::job_kind::GcJobKind; + +/// Job descriptor for garbage collection. +/// +/// Contains the fields needed to execute a GC job for a single physical table revision. +#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct JobDescriptor { + /// The physical table revision to collect garbage for. + pub location_id: LocationId, +} + +impl From for metadata_db::job_events::EventDetailOwned { + fn from(desc: JobDescriptor) -> Self { + #[derive(serde::Serialize)] + struct Tagged<'a> { + kind: GcJobKind, + #[serde(flatten)] + inner: &'a JobDescriptor, + } + + // SAFETY: `to_raw_value` only fails on non-string map keys which cannot occur + // with a flat struct containing only a primitive integer field. + let raw = serde_json::value::to_raw_value(&Tagged { + kind: GcJobKind, + inner: &desc, + }) + .expect("JobDescriptor serialization is infallible"); + + metadata_db::job_events::EventDetail::from_owned_unchecked(raw) + } +} + +impl TryFrom<&metadata_db::job_events::EventDetail<'_>> for JobDescriptor { + type Error = InvalidJobDescriptorError; + + fn try_from(raw: &metadata_db::job_events::EventDetail<'_>) -> Result { + #[derive(serde::Deserialize)] + struct TaggedOwned { + #[allow(dead_code)] + kind: GcJobKind, + #[serde(flatten)] + inner: JobDescriptor, + } + + let tagged: TaggedOwned = + serde_json::from_str(raw.as_str()).map_err(InvalidJobDescriptorError)?; + Ok(tagged.inner) + } +} + +/// Error returned when an [`EventDetail`] cannot be converted into a [`JobDescriptor`]. +#[derive(Debug, thiserror::Error)] +#[error("invalid job descriptor")] +pub struct InvalidJobDescriptorError(#[source] pub serde_json::Error); diff --git a/crates/core/worker-gc/src/job_impl.rs b/crates/core/worker-gc/src/job_impl.rs new file mode 100644 index 000000000..3d35a26b9 --- /dev/null +++ b/crates/core/worker-gc/src/job_impl.rs @@ -0,0 +1,148 @@ +//! GC job execution. +//! +//! Implements the collection algorithm: stream expired files from the GC manifest, +//! delete their metadata from Postgres, then delete the physical files from object storage. + +use std::collections::{BTreeMap, BTreeSet}; + +use amp_data_store::{DataStore, DeleteFilesStreamError, physical_table::PhyTableRevisionPath}; +use futures::{StreamExt as _, TryStreamExt as _, stream}; +use metadata_db::{ + MetadataDb, files::FileId, gc::GcManifestRow, physical_table_revision::LocationId, +}; +use object_store::{Error as ObjectStoreError, path::Path}; + +use crate::{error::Error, job_ctx::Context, job_descriptor::JobDescriptor, metrics::GcMetrics}; + +/// Execute a garbage collection job for a single physical table revision. +/// +/// The algorithm: +/// 1. Look up the revision by `location_id` to get its storage path. +/// 2. Stream expired files from the `gc_manifest` table. +/// 3. Delete file metadata rows from Postgres (cascades to `gc_manifest` and `footer_cache`). +/// 4. Delete physical files from object storage. +/// +/// Metadata is deleted before physical files. If the process crashes between steps 3 and 4, +/// orphaned files may remain in storage but no dangling metadata references will exist. +#[tracing::instrument(skip_all, err, fields(location_id = %desc.location_id, table_ref))] +pub async fn execute(ctx: Context, desc: JobDescriptor) -> Result<(), Error> { + let location_id = desc.location_id; + + // Look up the revision to get its storage path + let revision = + metadata_db::physical_table_revision::get_by_location_id(&ctx.metadata_db, location_id) + .await + .map_err(Error::MetadataDb)? + .ok_or(Error::LocationNotFound(location_id))?; + + // Record table_ref on the current span now that we have the revision path + tracing::Span::current().record("table_ref", revision.path.as_str()); + + let revision_path: PhyTableRevisionPath = revision.path.into(); + let metrics = ctx.meter.as_ref().map(|m| GcMetrics::new(m, *location_id)); + + collect( + &ctx.metadata_db, + &ctx.data_store, + location_id, + &revision_path, + metrics.as_ref(), + ) + .await +} + +/// Run the collection algorithm for a single location. +async fn collect( + metadata_db: &MetadataDb, + data_store: &DataStore, + location_id: LocationId, + revision_path: &PhyTableRevisionPath, + metrics: Option<&GcMetrics>, +) -> Result<(), Error> { + // Step 1: Stream expired files from the GC manifest + let found_file_ids_to_paths: BTreeMap = + metadata_db::gc::stream_expired(metadata_db, location_id) + .map_err(Error::FileStream) + .map(|manifest_row| { + let GcManifestRow { + file_id, + file_path: file_name, + .. + } = manifest_row?; + + let path = revision_path.child(file_name.as_str()); + Ok::<_, Error>((file_id, path)) + }) + .try_collect() + .await?; + + tracing::debug!( + expired_files = found_file_ids_to_paths.len(), + "expired files found" + ); + + if found_file_ids_to_paths.is_empty() { + return Ok(()); + } + + if let Some(m) = metrics { + m.record_expired_files_found(found_file_ids_to_paths.len() as u64); + } + + // Step 2: Delete file metadata from Postgres + let file_ids_to_delete: Vec = found_file_ids_to_paths.keys().copied().collect(); + let paths_to_remove: BTreeSet = + metadata_db::files::delete_by_ids(metadata_db, &file_ids_to_delete) + .await + .map_err(Error::FileMetadataDelete)? + .into_iter() + .filter_map(|file_id| found_file_ids_to_paths.get(&file_id).cloned()) + .collect(); + + tracing::debug!( + metadata_entries_deleted = paths_to_remove.len(), + "metadata entries deleted" + ); + + if let Some(m) = metrics { + m.record_metadata_entries_deleted(paths_to_remove.len() as u64); + } + + // Step 3: Delete physical files from object storage + let mut delete_stream = + data_store.delete_files_stream(stream::iter(paths_to_remove).map(Ok).boxed()); + + let mut files_deleted: u64 = 0; + let mut files_not_found: u64 = 0; + + while let Some(result) = delete_stream.next().await { + match result { + Ok(path) => { + tracing::debug!(%path, "deleted expired file"); + files_deleted += 1; + } + Err(DeleteFilesStreamError(ObjectStoreError::NotFound { path, .. })) => { + tracing::debug!(%path, "expired file not found"); + files_not_found += 1; + } + Err(DeleteFilesStreamError(err)) => { + tracing::warn!( + error = %err, + files_deleted, + files_not_found, + "collection aborted due to object store error" + ); + return Err(Error::ObjectStore(err)); + } + } + } + + if let Some(m) = metrics { + m.record_files_deleted(files_deleted); + m.record_files_not_found(files_not_found); + } + + tracing::info!(files_deleted, files_not_found, "collection complete"); + + Ok(()) +} diff --git a/crates/core/worker-gc/src/job_key.rs b/crates/core/worker-gc/src/job_key.rs new file mode 100644 index 000000000..059e87b0b --- /dev/null +++ b/crates/core/worker-gc/src/job_key.rs @@ -0,0 +1,21 @@ +//! Idempotency key computation for garbage collection jobs. +//! +//! Computes idempotency keys by combining the GC job kind discriminator +//! with a location ID, producing a deterministic hash that prevents duplicate +//! job scheduling for the same physical table revision. + +use datasets_common::hash::Hash; +use metadata_db::{jobs::IdempotencyKey, physical_table_revision::LocationId}; + +use crate::job_kind::JOB_KIND; + +/// Compute an idempotency key for a GC job. +/// +/// The key is derived by hashing `{job_kind}:{location_id}`, +/// producing a deterministic 64-character hex string. +pub fn idempotency_key(location_id: LocationId) -> IdempotencyKey<'static> { + let input = format!("{JOB_KIND}:{location_id}"); + let hash: Hash = datasets_common::hash::hash(input); + // SAFETY: The hash is a validated 64-char hex string produced by our hash function. + IdempotencyKey::from_owned_unchecked(hash.into_inner()) +} diff --git a/crates/core/worker-gc/src/job_kind.rs b/crates/core/worker-gc/src/job_kind.rs new file mode 100644 index 000000000..1df5d8752 --- /dev/null +++ b/crates/core/worker-gc/src/job_kind.rs @@ -0,0 +1,50 @@ +/// The `kind` discriminator value for garbage collection jobs. +pub const JOB_KIND: &str = "gc"; + +/// Type-safe representation of the garbage collection job kind. +/// +/// Serializes as the string [`JOB_KIND`] and only deserializes successfully +/// when the input matches that exact value. +#[derive(Debug)] +pub struct GcJobKind; + +impl std::fmt::Display for GcJobKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(JOB_KIND) + } +} + +impl std::str::FromStr for GcJobKind { + type Err = JobKindMismatchError; + + fn from_str(s: &str) -> Result { + if s != JOB_KIND { + return Err(JobKindMismatchError(s.to_owned())); + } + Ok(GcJobKind) + } +} + +impl serde::Serialize for GcJobKind { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(JOB_KIND) + } +} + +impl<'de> serde::Deserialize<'de> for GcJobKind { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + s.parse().map_err(serde::de::Error::custom) + } +} + +/// Error returned when a string does not match the expected [`JOB_KIND`]. +#[derive(Debug, thiserror::Error)] +#[error("job kind mismatch: expected `{JOB_KIND}`, got `{0}`")] +pub struct JobKindMismatchError(String); diff --git a/crates/core/worker-gc/src/lib.rs b/crates/core/worker-gc/src/lib.rs new file mode 100644 index 000000000..3619a08ef --- /dev/null +++ b/crates/core/worker-gc/src/lib.rs @@ -0,0 +1,7 @@ +pub mod error; +pub mod job_ctx; +pub mod job_descriptor; +pub mod job_impl; +pub mod job_key; +pub mod job_kind; +pub mod metrics; diff --git a/crates/core/worker-gc/src/metrics.rs b/crates/core/worker-gc/src/metrics.rs new file mode 100644 index 000000000..9286cbd63 --- /dev/null +++ b/crates/core/worker-gc/src/metrics.rs @@ -0,0 +1,62 @@ +//! Metrics for GC job execution. + +use monitoring::telemetry::metrics::{Counter, KeyValue, Meter}; + +/// Metrics recorded during garbage collection execution. +pub struct GcMetrics { + location_id: i64, + expired_files_found: Counter, + metadata_entries_deleted: Counter, + files_deleted: Counter, + files_not_found: Counter, +} + +impl GcMetrics { + /// Create a new metrics instance for a GC job. + pub fn new(meter: &Meter, location_id: i64) -> Self { + Self { + location_id, + expired_files_found: Counter::new( + meter, + "gc_expired_files_found", + "Number of expired files found in the GC manifest", + ), + metadata_entries_deleted: Counter::new( + meter, + "gc_metadata_entries_deleted", + "Number of file metadata entries deleted from Postgres", + ), + files_deleted: Counter::new( + meter, + "gc_files_deleted", + "Number of physical files deleted from object storage", + ), + files_not_found: Counter::new( + meter, + "gc_files_not_found", + "Number of expired files already missing from object storage", + ), + } + } + + fn kvs(&self) -> [KeyValue; 1] { + [KeyValue::new("location_id", self.location_id)] + } + + pub fn record_expired_files_found(&self, count: u64) { + self.expired_files_found.inc_by_with_kvs(count, &self.kvs()); + } + + pub fn record_metadata_entries_deleted(&self, count: u64) { + self.metadata_entries_deleted + .inc_by_with_kvs(count, &self.kvs()); + } + + pub fn record_files_deleted(&self, count: u64) { + self.files_deleted.inc_by_with_kvs(count, &self.kvs()); + } + + pub fn record_files_not_found(&self, count: u64) { + self.files_not_found.inc_by_with_kvs(count, &self.kvs()); + } +} diff --git a/crates/services/admin-api/Cargo.toml b/crates/services/admin-api/Cargo.toml index dd87f638e..3219f9953 100644 --- a/crates/services/admin-api/Cargo.toml +++ b/crates/services/admin-api/Cargo.toml @@ -14,6 +14,7 @@ amp-providers-registry = { path = "../../core/providers-registry" } amp-worker-core = { path = "../../core/worker-core" } amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" } amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" } +amp-worker-gc = { path = "../../core/worker-gc" } async-trait.workspace = true axum.workspace = true common = { path = "../../core/common" } diff --git a/crates/services/admin-api/src/scheduler.rs b/crates/services/admin-api/src/scheduler.rs index c093d732d..2861e6e7e 100644 --- a/crates/services/admin-api/src/scheduler.rs +++ b/crates/services/admin-api/src/scheduler.rs @@ -217,6 +217,13 @@ impl From for JobDes } } +impl From for JobDescriptor { + fn from(desc: amp_worker_gc::job_descriptor::JobDescriptor) -> Self { + let raw: EventDetailOwned = desc.into(); + Self(raw.into_inner()) + } +} + /// Errors that can occur when scheduling a dataset dump job #[derive(Debug, thiserror::Error)] pub enum ScheduleJobError { diff --git a/crates/services/controller/Cargo.toml b/crates/services/controller/Cargo.toml index 5326f91c8..f3404dde9 100644 --- a/crates/services/controller/Cargo.toml +++ b/crates/services/controller/Cargo.toml @@ -6,10 +6,12 @@ license-file.workspace = true [dependencies] admin-api = { path = "../admin-api" } +amp-config = { path = "../../config" } amp-data-store = { path = "../../core/data-store" } amp-datasets-registry = { version = "0.1.0", path = "../../core/datasets-registry" } amp-providers-registry = { version = "0.1.0", path = "../../core/providers-registry" } amp-worker-core = { path = "../../core/worker-core" } +amp-worker-gc = { path = "../../core/worker-gc" } async-trait.workspace = true axum.workspace = true common = { path = "../../core/common" } diff --git a/crates/services/controller/src/lib.rs b/crates/services/controller/src/lib.rs index a06cb0a94..9c2c99f6d 100644 --- a/crates/services/controller/src/lib.rs +++ b/crates/services/controller/src/lib.rs @@ -2,7 +2,7 @@ //! //! The controller service provides the admin API for managing Amp operations. -mod scheduler; +pub mod scheduler; pub mod service; // Re-export build_info to avoid code duplication diff --git a/crates/services/controller/src/scheduler.rs b/crates/services/controller/src/scheduler.rs index 84ebb8cbc..54a03e4fb 100644 --- a/crates/services/controller/src/scheduler.rs +++ b/crates/services/controller/src/scheduler.rs @@ -63,12 +63,49 @@ const DEAD_WORKER_INTERVAL: Duration = Duration::from_secs(5); /// Thread-safe for sharing across async tasks via `Arc`. pub struct Scheduler { metadata_db: MetadataDb, + gc_metrics: Option, +} + +/// Controller-level metrics for GC job scheduling. +struct GcSchedulerMetrics { + jobs_dispatched: monitoring::telemetry::metrics::Counter, + jobs_skipped_in_flight: monitoring::telemetry::metrics::Counter, + jobs_skipped_too_recent: monitoring::telemetry::metrics::Counter, +} + +impl GcSchedulerMetrics { + fn new(meter: &monitoring::telemetry::metrics::Meter) -> Self { + use monitoring::telemetry::metrics::Counter; + Self { + jobs_dispatched: Counter::new( + meter, + "gc_jobs_dispatched_total", + "Total number of GC jobs dispatched to workers", + ), + jobs_skipped_in_flight: Counter::new( + meter, + "gc_jobs_skipped_in_flight_total", + "Total number of GC scheduling attempts skipped because a job is already in-flight", + ), + jobs_skipped_too_recent: Counter::new( + meter, + "gc_jobs_skipped_too_recent_total", + "Total number of GC scheduling attempts skipped because last completion was too recent", + ), + } + } } impl Scheduler { /// Create a new scheduler instance - pub fn new(metadata_db: MetadataDb) -> Self { - Self { metadata_db } + pub fn new( + metadata_db: MetadataDb, + meter: Option<&monitoring::telemetry::metrics::Meter>, + ) -> Self { + Self { + metadata_db, + gc_metrics: meter.map(GcSchedulerMetrics::new), + } } /// Schedule a job with a pre-built descriptor @@ -356,6 +393,106 @@ impl Scheduler { Ok(()) } + + /// Schedule GC jobs for locations that have expired entries in the GC manifest. + /// + /// Pre-filters by querying `gc_manifest` for distinct location IDs with expired + /// entries, then for each location: + /// + /// 1. Look up the existing job by idempotency key. + /// 2. If in-flight (non-terminal state), skip. + /// 3. If completed recently (`now - updated_at < gc_interval`), skip. + /// 4. Otherwise, schedule a new job. + /// + /// This includes both active and deactivated revisions — GC is only concerned + /// with the state of the GC manifest, not the revision's active status. + pub async fn schedule_gc_jobs(&self, gc_interval: Duration) -> Result<(), ScheduleGcJobsError> { + let locations = metadata_db::gc::locations_with_expired_entries(&self.metadata_db) + .await + .map_err(ScheduleGcJobsError::ListExpiredLocations)?; + + for location_id in locations { + let idempotency_key = amp_worker_gc::job_key::idempotency_key(location_id); + + // Check if a GC job already exists for this location + let existing = + metadata_db::jobs::get_by_idempotency_key(&self.metadata_db, &idempotency_key) + .await + .map_err(ScheduleGcJobsError::CheckExistingJob)?; + + if let Some(job) = &existing { + // Skip if job is still in-flight + if !job.status.is_terminal() { + if let Some(m) = &self.gc_metrics { + m.jobs_skipped_in_flight.inc_by(1); + } + continue; + } + + // Skip if last completion was too recent + if job.status == JobStatus::Completed.into() { + let now = std::time::SystemTime::now(); + let updated: std::time::SystemTime = job.updated_at.into(); + let elapsed = now.duration_since(updated).unwrap_or_default(); + if elapsed < gc_interval { + if let Some(m) = &self.gc_metrics { + m.jobs_skipped_too_recent.inc_by(1); + } + continue; + } + } + } + + let descriptor = admin_api::scheduler::JobDescriptor::from( + amp_worker_gc::job_descriptor::JobDescriptor { location_id }, + ); + + match self + .schedule_job_impl(idempotency_key, descriptor, None) + .await + { + Ok(job_id) => { + if let Some(m) = &self.gc_metrics { + m.jobs_dispatched.inc_by(1); + } + tracing::debug!( + %location_id, + %job_id, + "scheduled GC job" + ); + } + Err(ScheduleJobError::NoWorkersAvailable) => { + tracing::warn!("no workers available for GC scheduling"); + return Ok(()); + } + Err(ScheduleJobError::ActiveJobConflict { .. }) => { + // GC job already running for this location, skip + } + Err(err) => { + tracing::error!( + %location_id, + error = %err, + error_source = logging::error_source(&err), + "failed to schedule GC job" + ); + } + } + } + + Ok(()) + } +} + +/// Errors that occur when scheduling GC jobs [`Scheduler::schedule_gc_jobs`] +#[derive(Debug, thiserror::Error)] +pub enum ScheduleGcJobsError { + /// Failed to query locations with expired GC manifest entries + #[error("failed to list locations with expired GC entries")] + ListExpiredLocations(#[source] metadata_db::Error), + + /// Failed to check existing job for a location + #[error("failed to check existing GC job")] + CheckExistingJob(#[source] metadata_db::Error), } /// Errors that occur during failed job reconciliation [`Scheduler::reconcile_failed_jobs`] diff --git a/crates/services/controller/src/service.rs b/crates/services/controller/src/service.rs index 276d7e4c7..098aea9ec 100644 --- a/crates/services/controller/src/service.rs +++ b/crates/services/controller/src/service.rs @@ -42,10 +42,11 @@ pub async fn new( ethcall_udfs_cache: EthCallUdfsCache, meter: Option, at: SocketAddr, + gc_config: amp_config::GcSchedulingConfig, ) -> Result<(SocketAddr, impl Future>), Error> { let build_info = build_info.into(); - let scheduler = Arc::new(Scheduler::new(metadata_db.clone())); + let scheduler = Arc::new(Scheduler::new(metadata_db.clone(), meter.as_ref())); let ctx = Ctx { metadata_db, @@ -94,6 +95,28 @@ pub async fn new( } }); + // Optionally spawn background task for GC job scheduling + let gc_scheduling_task = gc_config.enabled.then(|| { + let scheduler_for_gc = scheduler.clone(); + let gc_interval = gc_config.interval_duration(); + + tokio::spawn(async move { + let mut interval = tokio::time::interval(gc_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + interval.tick().await; + if let Err(err) = scheduler_for_gc.schedule_gc_jobs(gc_interval).await { + tracing::error!( + error = %err, + error_source = monitoring::logging::error_source(&err), + "GC job scheduling failed" + ); + } + } + }) + }); + // The service future that runs the server with graceful shutdown let server = async move { let server_future = axum::serve(listener, router).with_graceful_shutdown(shutdown_signal()); @@ -105,6 +128,15 @@ pub async fn new( _ = reconciliation_task => { Err(ServerError::ReconciliationTerminated) } + _ = async { + if let Some(task) = gc_scheduling_task { + task.await.ok(); + } else { + std::future::pending::<()>().await; + } + } => { + Err(ServerError::GcSchedulingTerminated) + } } }; Ok((addr, server)) @@ -164,6 +196,14 @@ pub enum ServerError { /// - Task is cancelled externally #[error("Reconciliation task terminated unexpectedly")] ReconciliationTerminated, + + /// Background GC scheduling task terminated unexpectedly + /// + /// This occurs when: + /// - GC scheduling task panics + /// - Task is cancelled externally + #[error("GC scheduling task terminated unexpectedly")] + GcSchedulingTerminated, } /// Returns a future that completes when a shutdown signal is received. diff --git a/crates/services/worker/Cargo.toml b/crates/services/worker/Cargo.toml index 2bc0ea6ab..9d687b743 100644 --- a/crates/services/worker/Cargo.toml +++ b/crates/services/worker/Cargo.toml @@ -10,6 +10,7 @@ amp-data-store = { path = "../../core/data-store" } amp-worker-core = { path = "../../core/worker-core" } amp-worker-datasets-derived = { path = "../../core/worker-datasets-derived" } amp-worker-datasets-raw = { path = "../../core/worker-datasets-raw" } +amp-worker-gc = { path = "../../core/worker-gc" } async-trait.workspace = true backon.workspace = true chrono.workspace = true diff --git a/crates/services/worker/src/job.rs b/crates/services/worker/src/job.rs index a9988f48b..6efebf397 100644 --- a/crates/services/worker/src/job.rs +++ b/crates/services/worker/src/job.rs @@ -56,4 +56,5 @@ impl From for metadata_db::jobs::Job { pub enum JobDescriptor { MaterializeRaw(amp_worker_datasets_raw::job_descriptor::JobDescriptor), MaterializeDerived(amp_worker_datasets_derived::job_descriptor::JobDescriptor), + Gc(amp_worker_gc::job_descriptor::JobDescriptor), } diff --git a/crates/services/worker/src/service/job_impl.rs b/crates/services/worker/src/service/job_impl.rs index 68ddb364a..4854e0723 100644 --- a/crates/services/worker/src/service/job_impl.rs +++ b/crates/services/worker/src/service/job_impl.rs @@ -1,6 +1,6 @@ //! Internal job implementation for the worker service. -use std::{future::Future, sync::Arc}; +use std::sync::Arc; use amp_worker_core::{ ProgressReporter, @@ -16,141 +16,159 @@ use crate::{ events::WorkerProgressReporter, job::JobDescriptor, kafka::proto, service::WorkerJobCtx, }; -/// Create and run a worker job that materializes tables from a dataset. +/// Create and run a worker job. /// -/// This function returns a future that executes the materialization operation. -/// Raw datasets are handled by `amp_worker_datasets_raw` and derived -/// datasets are handled by `amp_worker_datasets_derived`. -pub(super) fn new( +/// This function returns a future that executes the job operation. +/// Raw datasets are handled by `amp_worker_datasets_raw`, derived +/// datasets by `amp_worker_datasets_derived`, and garbage collection +/// by `amp_worker_gc`. +pub(super) async fn new( job_ctx: WorkerJobCtx, job_desc: JobDescriptor, job_id: JobId, -) -> impl Future> { - let reference = match &job_desc { - JobDescriptor::MaterializeRaw(desc) => HashReference::new( - desc.dataset_namespace.clone(), - desc.dataset_name.clone(), - desc.manifest_hash.clone(), - ), - JobDescriptor::MaterializeDerived(desc) => HashReference::new( - desc.dataset_namespace.clone(), - desc.dataset_name.clone(), - desc.manifest_hash.clone(), - ), - }; - - let metrics = job_ctx - .meter - .as_ref() - .map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id))); - - // Create progress reporter for event streaming - // Always create the reporter - NoOpEmitter will discard events if not needed - let progress_reporter: Option> = { - let dataset_info = proto::DatasetInfo { - namespace: reference.namespace().to_string(), - name: reference.name().to_string(), - manifest_hash: reference.hash().to_string(), - }; - Some(Arc::new(WorkerProgressReporter::new( - job_id, - dataset_info, - job_ctx.event_emitter.clone(), - ))) - }; - - let writer: metadata_db::jobs::JobId = job_id.into(); - async move { - match job_desc { - JobDescriptor::MaterializeRaw(desc) => { - let ctx = amp_worker_datasets_raw::job_ctx::Context { - job_id: Some(writer), - config: amp_worker_datasets_raw::job_ctx::Config { - poll_interval: job_ctx.config.poll_interval, - progress_interval: job_ctx - .config - .events_config - .progress_interval - .clone() - .into(), - parquet_writer: (&job_ctx.config.parquet).into(), - }, - metadata_db: job_ctx.metadata_db.clone(), - datasets_cache: job_ctx.datasets_cache.clone(), - ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), - data_store: job_ctx.data_store.clone(), - notification_multiplexer: job_ctx.notification_multiplexer.clone(), - metrics, - progress_reporter, +) -> Result<(), JobError> { + match job_desc { + JobDescriptor::Gc(desc) => { + let ctx = amp_worker_gc::job_ctx::Context { + metadata_db: job_ctx.metadata_db.clone(), + data_store: job_ctx.data_store.clone(), + meter: job_ctx.meter.clone(), + }; + + amp_worker_gc::job_impl::execute(ctx, desc) + .instrument(info_span!("gc_job", %job_id)) + .await + .map_err(JobError::Gc)?; + } + JobDescriptor::MaterializeRaw(desc) => { + let reference = HashReference::new( + desc.dataset_namespace.clone(), + desc.dataset_name.clone(), + desc.manifest_hash.clone(), + ); + let metrics = job_ctx + .meter + .as_ref() + .map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id))); + let progress_reporter: Option> = { + let dataset_info = proto::DatasetInfo { + namespace: reference.namespace().to_string(), + name: reference.name().to_string(), + manifest_hash: reference.hash().to_string(), }; + Some(Arc::new(WorkerProgressReporter::new( + job_id, + dataset_info, + job_ctx.event_emitter.clone(), + ))) + }; + let writer: metadata_db::jobs::JobId = job_id.into(); - amp_worker_datasets_raw::job_impl::execute(ctx, desc, writer) + let ctx = amp_worker_datasets_raw::job_ctx::Context { + job_id: Some(writer), + config: amp_worker_datasets_raw::job_ctx::Config { + poll_interval: job_ctx.config.poll_interval, + progress_interval: job_ctx + .config + .events_config + .progress_interval + .clone() + .into(), + parquet_writer: (&job_ctx.config.parquet).into(), + }, + metadata_db: job_ctx.metadata_db.clone(), + datasets_cache: job_ctx.datasets_cache.clone(), + ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), + data_store: job_ctx.data_store.clone(), + notification_multiplexer: job_ctx.notification_multiplexer.clone(), + metrics, + progress_reporter, + }; + + amp_worker_datasets_raw::job_impl::execute(ctx, desc, writer) .instrument( info_span!("materialize_raw_job", %job_id, dataset = %format!("{reference:#}")), ) .await .map_err(JobError::MaterializeRaw)?; - } - JobDescriptor::MaterializeDerived(desc) => { - let ctx = amp_worker_datasets_derived::job_ctx::Context { - job_id: Some(writer), - config: amp_worker_datasets_derived::job_ctx::Config { - keep_alive_interval: job_ctx.config.keep_alive_interval, - max_mem_mb: job_ctx.config.max_mem_mb, - query_max_mem_mb: job_ctx.config.query_max_mem_mb, - spill_location: job_ctx.config.spill_location.clone(), - progress_interval: job_ctx - .config - .events_config - .progress_interval - .clone() - .into(), - parquet_writer: (&job_ctx.config.parquet).into(), - microbatch_max_interval: job_ctx.config.microbatch_max_interval, - }, - metadata_db: job_ctx.metadata_db.clone(), - datasets_cache: job_ctx.datasets_cache.clone(), - ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), - data_store: job_ctx.data_store.clone(), - isolate_pool: job_ctx.isolate_pool.clone(), - notification_multiplexer: job_ctx.notification_multiplexer.clone(), - metrics, - progress_reporter, + } + JobDescriptor::MaterializeDerived(desc) => { + let reference = HashReference::new( + desc.dataset_namespace.clone(), + desc.dataset_name.clone(), + desc.manifest_hash.clone(), + ); + let metrics = job_ctx + .meter + .as_ref() + .map(|m| Arc::new(MetricsRegistry::new(m, reference.clone(), *job_id))); + let progress_reporter: Option> = { + let dataset_info = proto::DatasetInfo { + namespace: reference.namespace().to_string(), + name: reference.name().to_string(), + manifest_hash: reference.hash().to_string(), }; + Some(Arc::new(WorkerProgressReporter::new( + job_id, + dataset_info, + job_ctx.event_emitter.clone(), + ))) + }; + let writer: metadata_db::jobs::JobId = job_id.into(); - amp_worker_datasets_derived::job_impl::execute(ctx, desc, writer) + let ctx = amp_worker_datasets_derived::job_ctx::Context { + job_id: Some(writer), + config: amp_worker_datasets_derived::job_ctx::Config { + keep_alive_interval: job_ctx.config.keep_alive_interval, + max_mem_mb: job_ctx.config.max_mem_mb, + query_max_mem_mb: job_ctx.config.query_max_mem_mb, + spill_location: job_ctx.config.spill_location.clone(), + progress_interval: job_ctx + .config + .events_config + .progress_interval + .clone() + .into(), + parquet_writer: (&job_ctx.config.parquet).into(), + microbatch_max_interval: job_ctx.config.microbatch_max_interval, + }, + metadata_db: job_ctx.metadata_db.clone(), + datasets_cache: job_ctx.datasets_cache.clone(), + ethcall_udfs_cache: job_ctx.ethcall_udfs_cache.clone(), + data_store: job_ctx.data_store.clone(), + isolate_pool: job_ctx.isolate_pool.clone(), + notification_multiplexer: job_ctx.notification_multiplexer.clone(), + metrics, + progress_reporter, + }; + + amp_worker_datasets_derived::job_impl::execute(ctx, desc, writer) .instrument(info_span!("materialize_derived_job", %job_id, dataset = %format!("{reference:#}"))) .await .map_err(JobError::MaterializeDerived)?; - } } - - Ok(()) } + + Ok(()) } -/// Errors from dataset materialization job execution. +/// Errors from worker job execution. /// -/// Wraps the specific error types from raw and derived dataset materialization operations -/// to provide a unified error type for the worker job system. +/// Wraps the specific error types from each job type to provide a unified +/// error type for the worker job system. #[derive(Debug, thiserror::Error)] pub(crate) enum JobError { - /// Raw dataset materialization operation failed - /// - /// This occurs when the raw dataset extraction and Parquet file writing - /// process encounters an error. Common causes include blockchain client - /// connectivity issues, consistency check failures, and partition task errors. + /// Raw dataset materialization operation failed. #[error("Failed to materialize raw dataset")] MaterializeRaw(#[source] amp_worker_datasets_raw::job_impl::Error), - /// Derived dataset materialization operation failed - /// - /// This occurs when the derived dataset SQL query execution and Parquet - /// file writing process encounters an error. Common causes include query - /// environment creation failures, manifest retrieval errors, and table - /// materialization failures. + /// Derived dataset materialization operation failed. #[error("Failed to materialize derived dataset")] MaterializeDerived(#[source] amp_worker_datasets_derived::job_impl::Error), + + /// Garbage collection job failed. + #[error("Failed to run garbage collection")] + Gc(#[source] amp_worker_gc::error::Error), } impl JobErrorExt for JobError { @@ -158,6 +176,7 @@ impl JobErrorExt for JobError { match self { Self::MaterializeRaw(err) => err.error_code(), Self::MaterializeDerived(err) => err.error_code(), + Self::Gc(err) => err.error_code(), } } } @@ -167,6 +186,7 @@ impl ErrorDetailsProvider for JobError { match self { Self::MaterializeRaw(err) => Some(err), Self::MaterializeDerived(err) => Some(err), + Self::Gc(err) => Some(err), } } } @@ -176,6 +196,7 @@ impl RetryableErrorExt for JobError { match self { Self::MaterializeRaw(err) => err.is_retryable(), Self::MaterializeDerived(err) => err.is_retryable(), + Self::Gc(err) => err.is_retryable(), } } } diff --git a/docs/config.sample.toml b/docs/config.sample.toml index b403214bb..1e0c26832 100644 --- a/docs/config.sample.toml +++ b/docs/config.sample.toml @@ -105,6 +105,13 @@ url = "postgres://" # min_interval = 30.0 # Interval in seconds to run the garbage collector (default: 30.0) # deletion_lock_duration = 1800.0 # Duration in seconds to hold deletion lock on compacted files (default: 1800.0 = 30 minutes) +# Controller-managed garbage collection scheduling +# When enabled, the controller schedules GC jobs via the job ledger for workers to execute. +# This is separate from the legacy worker-internal GC ([writer.collector]). +[gc_scheduling] +# enabled = false # Enable GC job scheduling (default: false) +# interval = 60.0 # Interval in seconds between GC scheduling sweeps (default: 60.0) + # Worker configuration [worker_events] # enabled = false # Enable event streaming to Kafka (default: false) diff --git a/docs/schemas/config/ampd.spec.json b/docs/schemas/config/ampd.spec.json index b231ed9b2..3260177e1 100644 --- a/docs/schemas/config/ampd.spec.json +++ b/docs/schemas/config/ampd.spec.json @@ -23,6 +23,9 @@ "null" ] }, + "gc_scheduling": { + "$ref": "#/$defs/GcSchedulingConfig" + }, "jsonl_addr": { "description": "JSON Lines server address (default: \"0.0.0.0:1603\")", "type": [ @@ -202,6 +205,21 @@ "description": "Duration in seconds (floating-point)", "type": "number" }, + "GcSchedulingConfig": { + "description": "Configuration for controller-managed garbage collection scheduling.\n\nControls whether the controller schedules GC jobs and at what interval.\nGC jobs are executed by workers, not the controller.", + "type": "object", + "properties": { + "enabled": { + "description": "Enable GC job scheduling (default: `false`).\n\nWhen `false`, the controller will not schedule any GC jobs.\nEnable only after verifying the legacy worker-internal GC is disabled.", + "type": "boolean", + "default": false + }, + "interval": { + "description": "Interval in seconds between GC scheduling sweeps (default: 60).\n\nThe controller scans for active physical table revisions at this\ninterval and schedules GC jobs for each one.", + "$ref": "#/$defs/ConfigDuration" + } + } + }, "KafkaEventsConfig": { "description": "Kafka configuration for worker events.", "type": "object", diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 85eaff59a..67ba10350 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -22,6 +22,7 @@ amp-parquet = { path = "../crates/core/parquet" } amp-providers-registry = { path = "../crates/core/providers-registry" } amp-providers-solana = { path = "../crates/core/providers-solana" } amp-worker-core = { path = "../crates/core/worker-core" } +amp-worker-gc = { path = "../crates/core/worker-gc" } amp-client-admin = { package = "admin-client", path = "../crates/clients/admin" } ampctl = { path = "../crates/bin/ampctl" } anyhow.workspace = true diff --git a/tests/src/testlib/fixtures/daemon_controller.rs b/tests/src/testlib/fixtures/daemon_controller.rs index d8d93058e..cb2b05f0b 100644 --- a/tests/src/testlib/fixtures/daemon_controller.rs +++ b/tests/src/testlib/fixtures/daemon_controller.rs @@ -59,6 +59,7 @@ impl DaemonController { ethcall_udfs_cache, meter, admin_api_addr, + amp_config::GcSchedulingConfig::default(), ) .await?; diff --git a/tests/src/tests/it_gc.rs b/tests/src/tests/it_gc.rs new file mode 100644 index 000000000..71585007e --- /dev/null +++ b/tests/src/tests/it_gc.rs @@ -0,0 +1,478 @@ +//! Integration tests for GC job execution and scheduling. +//! +//! These tests verify: +//! - The full collection algorithm: expired file streaming, metadata deletion, +//! and physical file deletion. +//! - The GC manifest pre-filter query used by the controller scheduler. + +use std::time::Duration; + +use amp_data_store::DataStore; +use amp_worker_core::jobs::job_id::JobId; +use amp_worker_gc::{job_ctx::Context, job_descriptor::JobDescriptor}; +use futures::TryStreamExt; +use metadata_db::{ + datasets::{DatasetName, DatasetNamespace}, + files::{self, FileName}, + gc, physical_table, + physical_table_revision::{self, LocationId, TablePath}, +}; +use tempfile::TempDir; +use url::Url; + +use crate::testlib::fixtures::MetadataDb as MetadataDbFixture; + +/// Test context for GC tests. +/// +/// Wraps the shared test fixtures (metadata DB, test directory) and provides +/// convenience helpers for setting up test data. +struct GcTestCtx { + conn: metadata_db::MetadataDb, + test_dir: TempDir, + _metadata_db: MetadataDbFixture, +} + +impl GcTestCtx { + /// Create a new GC test context with an isolated Postgres instance and temp directory. + async fn setup(test_name: &str) -> Self { + monitoring::logging::init(); + + let metadata_db = MetadataDbFixture::new().await; + let conn = metadata_db.conn_pool().clone(); + + // Register a worker (required for foreign key constraints on job tables) + let worker_id = metadata_db::workers::WorkerNodeId::from_ref_unchecked("gc-test-worker"); + let raw: Box = + serde_json::from_str("{}").expect("empty JSON should be valid"); + let worker_info = metadata_db::workers::WorkerInfo::from_owned_unchecked(raw); + metadata_db::workers::register(&conn, worker_id, worker_info) + .await + .expect("failed to register worker"); + + let test_dir = tempfile::Builder::new() + .prefix(&format!("gc_test__{test_name}__")) + .tempdir() + .expect("failed to create test dir"); + + Self { + conn, + test_dir, + _metadata_db: metadata_db, + } + } + + /// Create a DataStore backed by the test directory's filesystem. + fn data_store(&self) -> DataStore { + let data_url = amp_object_store::url::ObjectStoreUrl::new(format!( + "file://{}/", + self.test_dir.path().display() + )) + .expect("failed to create object store URL"); + + DataStore::new(self.conn.clone(), data_url, 0).expect("failed to create data store") + } + + /// Register a physical table and revision, returning its location ID. + async fn register_revision(&self, path: &str) -> LocationId { + let namespace = DatasetNamespace::from_ref_unchecked("test_ns"); + let name = DatasetName::from_ref_unchecked("test_dataset"); + let hash = metadata_db::manifests::ManifestHash::from_ref_unchecked( + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + ); + let table_name = physical_table::TableName::from_ref_unchecked("test_table"); + + physical_table::register(&self.conn, &namespace, &name, &hash, &table_name) + .await + .expect("failed to register physical table"); + + let metadata_json = serde_json::json!({ + "dataset_namespace": "test_ns", + "dataset_name": "test_dataset", + "manifest_hash": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "table_name": "test_table", + }); + let raw = serde_json::value::to_raw_value(&metadata_json) + .expect("test metadata should serialize"); + let metadata = physical_table_revision::RevisionMetadata::from_owned_unchecked(raw); + let table_path = TablePath::from_ref_unchecked(path); + + physical_table_revision::register(&self.conn, table_path, metadata) + .await + .expect("failed to register revision") + } + + /// Register a file in the metadata database for a given location. + async fn register_file(&self, location_id: LocationId, file_name: &str, url: &Url) { + files::register( + &self.conn, + location_id, + url, + FileName::from_ref_unchecked(file_name), + 100, + None, + None, + serde_json::json!({}), + &vec![0u8; 10], + ) + .await + .expect("failed to register file"); + } + + /// Get all file IDs for a given location. + async fn file_ids(&self, location_id: LocationId) -> Vec { + files::stream_by_location_id_with_details(&self.conn, location_id) + .map_ok(|f| f.id) + .try_collect() + .await + .expect("failed to stream files") + } + + /// Build a GC execution context. + fn gc_context(&self) -> Context { + Context { + metadata_db: self.conn.clone(), + data_store: self.data_store(), + meter: None, + } + } + + /// Set up a location with an expired GC entry and return its location ID and idempotency key. + /// + /// Registers a revision, a file, upserts a 1s GC entry, and waits for expiry. + async fn setup_expired_location( + &self, + path: &str, + file_name: &str, + ) -> (LocationId, metadata_db::jobs::IdempotencyKey<'static>) { + let loc = self.register_revision(path).await; + let url = Url::parse(&format!("file:///tmp/{file_name}")).unwrap(); + self.register_file(loc, file_name, &url).await; + let file_ids = self.file_ids(loc).await; + gc::upsert(&self.conn, loc, &file_ids, Duration::from_secs(1)) + .await + .unwrap(); + tokio::time::sleep(Duration::from_secs(2)).await; + let key = amp_worker_gc::job_key::idempotency_key(loc); + (loc, key) + } + + /// Transition a job from Scheduled → Running → Completed. + async fn complete_job(&self, job_id: JobId) { + metadata_db::job_status::mark_running(&self.conn, job_id) + .await + .expect("failed to mark running"); + metadata_db::job_status::mark_completed(&self.conn, job_id) + .await + .expect("failed to mark completed"); + } +} + +// --- GC execution tests --- + +#[tokio::test] +async fn gc_deletes_expired_files() { + //* Given + let ctx = GcTestCtx::setup("gc_deletes_expired").await; + + let revision_path = "test_ns/test_dataset/aaaaaaaaaa/test_table"; + let location_id = ctx.register_revision(revision_path).await; + + // Create physical files on disk + let revision_dir = ctx.test_dir.path().join(revision_path); + std::fs::create_dir_all(&revision_dir).expect("failed to create revision dir"); + std::fs::write(revision_dir.join("file_a.parquet"), b"test data a") + .expect("failed to write file_a"); + std::fs::write(revision_dir.join("file_b.parquet"), b"test data b") + .expect("failed to write file_b"); + + // Register file metadata in Postgres + let base_url = Url::parse(&format!( + "file://{}/{}/", + ctx.test_dir.path().display(), + revision_path, + )) + .expect("failed to parse URL"); + + ctx.register_file( + location_id, + "file_a.parquet", + &base_url.join("file_a.parquet").unwrap(), + ) + .await; + ctx.register_file( + location_id, + "file_b.parquet", + &base_url.join("file_b.parquet").unwrap(), + ) + .await; + + let file_ids = ctx.file_ids(location_id).await; + assert_eq!(file_ids.len(), 2, "should have 2 registered files"); + + // Schedule files for GC with 1s duration (minimum allowed by DB constraint) + gc::upsert(&ctx.conn, location_id, &file_ids, Duration::from_secs(1)) + .await + .expect("failed to upsert GC manifest"); + + // Wait for the files to expire + tokio::time::sleep(Duration::from_secs(2)).await; + + //* When + amp_worker_gc::job_impl::execute(ctx.gc_context(), JobDescriptor { location_id }) + .await + .expect("GC execution failed"); + + //* Then + let remaining_files = ctx.file_ids(location_id).await; + assert!( + remaining_files.is_empty(), + "all file metadata should be deleted" + ); + + let expired: Vec = gc::stream_expired(&ctx.conn, location_id) + .try_collect() + .await + .expect("failed to stream expired"); + assert!(expired.is_empty(), "GC manifest should be empty"); + + assert!( + !revision_dir.join("file_a.parquet").exists(), + "file_a.parquet should be deleted from disk" + ); + assert!( + !revision_dir.join("file_b.parquet").exists(), + "file_b.parquet should be deleted from disk" + ); +} + +#[tokio::test] +async fn gc_with_no_expired_files_is_a_noop() { + //* Given + let ctx = GcTestCtx::setup("gc_noop").await; + + let revision_path = "test_ns/test_dataset/aaaaaaaaaa/test_table"; + let location_id = ctx.register_revision(revision_path).await; + + //* When + amp_worker_gc::job_impl::execute(ctx.gc_context(), JobDescriptor { location_id }) + .await + .expect("GC execution should succeed with no expired files"); + + //* Then — no errors, no panics +} + +#[tokio::test] +async fn gc_with_nonexistent_location_returns_error() { + //* Given + let ctx = GcTestCtx::setup("gc_nonexistent").await; + + let desc = JobDescriptor { + location_id: LocationId::try_from(999999i64).unwrap(), + }; + + //* When + let result = amp_worker_gc::job_impl::execute(ctx.gc_context(), desc).await; + + //* Then + assert!(result.is_err(), "should fail with LocationNotFound"); + assert!( + matches!( + result.unwrap_err(), + amp_worker_gc::error::Error::LocationNotFound(_) + ), + "error should be LocationNotFound" + ); +} + +// --- Scheduler tests --- +// These test the controller's schedule_gc_jobs() method to verify it only +// schedules GC jobs for locations that actually have expired entries. + +#[tokio::test] +async fn scheduler_only_schedules_gc_for_locations_with_expired_entries() { + //* Given + let ctx = GcTestCtx::setup("gc_scheduler_prefilter").await; + + let loc_a = ctx + .register_revision("test_ns/ds_a/aaaaaaaaaa/table_a") + .await; + let loc_b = ctx + .register_revision("test_ns/ds_b/aaaaaaaaaa/table_b") + .await; + + let url_a = Url::parse("file:///tmp/a.parquet").unwrap(); + let url_b = Url::parse("file:///tmp/b.parquet").unwrap(); + ctx.register_file(loc_a, "a.parquet", &url_a).await; + ctx.register_file(loc_b, "b.parquet", &url_b).await; + + let file_ids_a = ctx.file_ids(loc_a).await; + let file_ids_b = ctx.file_ids(loc_b).await; + + // loc_a: 1s expiry (will expire), loc_b: 1h expiry (won't expire) + gc::upsert(&ctx.conn, loc_a, &file_ids_a, Duration::from_secs(1)) + .await + .unwrap(); + gc::upsert(&ctx.conn, loc_b, &file_ids_b, Duration::from_secs(3600)) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_secs(2)).await; + + let scheduler = controller::scheduler::Scheduler::new(ctx.conn.clone(), None); + + //* When + scheduler + .schedule_gc_jobs(Duration::from_secs(0)) + .await + .expect("schedule_gc_jobs should succeed"); + + //* Then — only loc_a should have a job scheduled, not loc_b + let key_a = amp_worker_gc::job_key::idempotency_key(loc_a); + let key_b = amp_worker_gc::job_key::idempotency_key(loc_b); + + let job_a = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key_a) + .await + .expect("failed to query job for loc_a"); + let job_b = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key_b) + .await + .expect("failed to query job for loc_b"); + + assert!( + job_a.is_some(), + "scheduler should have created a job for loc_a (expired entries)" + ); + assert!( + job_b.is_none(), + "scheduler should NOT have created a job for loc_b (no expired entries)" + ); +} + +#[tokio::test] +async fn scheduler_schedules_gc_for_deactivated_revisions() { + //* Given + let ctx = GcTestCtx::setup("gc_scheduler_deactivated").await; + + // Register a revision (inactive by default — never called mark_active_by_id) + let loc = ctx + .register_revision("test_ns/ds_inactive/aaaaaaaaaa/table_inactive") + .await; + + let url = Url::parse("file:///tmp/inactive.parquet").unwrap(); + ctx.register_file(loc, "inactive.parquet", &url).await; + + let file_ids = ctx.file_ids(loc).await; + + gc::upsert(&ctx.conn, loc, &file_ids, Duration::from_secs(1)) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_secs(2)).await; + + let scheduler = controller::scheduler::Scheduler::new(ctx.conn.clone(), None); + + //* When + scheduler + .schedule_gc_jobs(Duration::from_secs(0)) + .await + .expect("schedule_gc_jobs should succeed"); + + //* Then — deactivated revision should still get a GC job + let key = amp_worker_gc::job_key::idempotency_key(loc); + let job = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key) + .await + .expect("failed to query job"); + + assert!( + job.is_some(), + "scheduler should schedule GC for deactivated revisions" + ); +} + +#[tokio::test] +async fn scheduler_skips_in_flight_gc_jobs() { + //* Given + let ctx = GcTestCtx::setup("gc_scheduler_in_flight").await; + + let (_loc, key) = ctx + .setup_expired_location( + "test_ns/ds_inflight/aaaaaaaaaa/table_inflight", + "inflight.parquet", + ) + .await; + + let scheduler = controller::scheduler::Scheduler::new(ctx.conn.clone(), None); + + // First call: creates a Scheduled job + scheduler + .schedule_gc_jobs(Duration::from_secs(0)) + .await + .expect("first schedule_gc_jobs should succeed"); + + let first_job = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key) + .await + .expect("failed to query job") + .expect("job should exist after first scheduling"); + + //* When — call again while the job is still in-flight (Scheduled) + scheduler + .schedule_gc_jobs(Duration::from_secs(0)) + .await + .expect("second schedule_gc_jobs should succeed"); + + //* Then — the job should be unchanged (same ID, still Scheduled) + let second_job = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key) + .await + .expect("failed to query job") + .expect("job should still exist"); + + assert_eq!( + first_job.id, second_job.id, + "scheduler should not create a duplicate job while one is in-flight" + ); +} + +#[tokio::test] +async fn scheduler_skips_recently_completed_gc_jobs() { + //* Given + let ctx = GcTestCtx::setup("gc_scheduler_too_recent").await; + + let (_loc, key) = ctx + .setup_expired_location( + "test_ns/ds_recent/aaaaaaaaaa/table_recent", + "recent.parquet", + ) + .await; + + let scheduler = controller::scheduler::Scheduler::new(ctx.conn.clone(), None); + + // Schedule and complete a GC job + scheduler + .schedule_gc_jobs(Duration::from_secs(0)) + .await + .expect("initial schedule_gc_jobs should succeed"); + + let job = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key) + .await + .expect("failed to query job") + .expect("job should exist"); + + ctx.complete_job(job.id.into()).await; + + //* When — schedule again with the default gc_interval (60s) + scheduler + .schedule_gc_jobs(Duration::from_secs(60)) + .await + .expect("schedule_gc_jobs should succeed"); + + //* Then — the job should still be in Completed state, not rescheduled + let after_job = metadata_db::jobs::get_by_idempotency_key(&ctx.conn, &key) + .await + .expect("failed to query job") + .expect("job should still exist"); + + assert_eq!( + after_job.status, + amp_worker_core::jobs::status::JobStatus::Completed.into(), + "scheduler should not reschedule a recently completed GC job" + ); +} diff --git a/tests/src/tests/mod.rs b/tests/src/tests/mod.rs index 875e6a716..22db01155 100644 --- a/tests/src/tests/mod.rs +++ b/tests/src/tests/mod.rs @@ -16,6 +16,7 @@ mod it_dependencies; mod it_dump; mod it_functions; mod it_functions_eth_call; +mod it_gc; mod it_joins; mod it_multi_network_batch; mod it_multi_table_continuous;