Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 73 additions & 8 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,26 +660,41 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
get_resolvers(&config.storage_configs, &config.metastore_configs);
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
let mut index_service = IndexService::new(metastore, storage_resolver);

if quickwit_common::is_metrics_index(&args.index_id) {
let removal_info = index_service
.garbage_collect_parquet_index(&args.index_id, args.grace_period, args.dry_run)
.await?;
return print_parquet_gc_result(args.dry_run, removal_info);
}

let removal_info = index_service
.garbage_collect_index(&args.index_id, args.grace_period, args.dry_run)
.await?;
print_tantivy_gc_result(args.dry_run, removal_info)
}

fn print_tantivy_gc_result(
dry_run: bool,
removal_info: quickwit_index_management::SplitRemovalInfo,
) -> anyhow::Result<()> {
if removal_info.removed_split_entries.is_empty() && removal_info.failed_splits.is_empty() {
println!("No dangling files to garbage collect.");
return Ok(());
}

if args.dry_run {
if dry_run {
println!("The following files will be garbage collected.");
for split_info in removal_info.removed_split_entries {
println!(" - {}", split_info.file_name.display());
for entry in &removal_info.removed_split_entries {
println!(" - {}", entry.file_name.display());
}
return Ok(());
}

if !removal_info.failed_splits.is_empty() {
println!("The following splits were attempted to be removed, but failed.");
for split_info in &removal_info.failed_splits {
println!(" - {}", split_info.split_id);
for split in &removal_info.failed_splits {
println!(" - {}", split.split_id);
}
println!(
"{} Splits were unable to be removed.",
Expand All @@ -690,7 +705,7 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
let deleted_bytes: u64 = removal_info
.removed_split_entries
.iter()
.map(|split_info| split_info.file_size_bytes.as_u64())
.map(|s| s.file_size_bytes.as_u64())
.sum();
println!(
"{}MB of storage garbage collected.",
Expand All @@ -702,9 +717,59 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
"{} Index successfully garbage collected.",
"✔".color(GREEN_COLOR)
);
} else if removal_info.removed_split_entries.is_empty()
&& !removal_info.failed_splits.is_empty()
} else if removal_info.removed_split_entries.is_empty() {
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
} else {
println!(
"{} Index partially garbage collected.",
"✘".color(RED_COLOR)
);
}

Ok(())
}

fn print_parquet_gc_result(
dry_run: bool,
removal_info: quickwit_index_management::ParquetSplitRemovalInfo,
) -> anyhow::Result<()> {
if removal_info.removed_parquet_splits_entries.is_empty()
&& removal_info.failed_parquet_splits.is_empty()
{
println!("No dangling files to garbage collect.");
return Ok(());
}

if dry_run {
println!("The following files will be garbage collected.");
for entry in &removal_info.removed_parquet_splits_entries {
println!(" - {}.parquet", entry.split_id);
}
return Ok(());
}

if !removal_info.failed_parquet_splits.is_empty() {
println!("The following splits were attempted to be removed, but failed.");
for split in &removal_info.failed_parquet_splits {
println!(" - {}", split.split_id);
}
println!(
"{} Splits were unable to be removed.",
removal_info.failed_parquet_splits.len()
);
}

println!(
"{}MB of storage garbage collected.",
removal_info.removed_bytes() / 1_000_000
);

if removal_info.failed_parquet_splits.is_empty() {
println!(
"{} Index successfully garbage collected.",
"✔".color(GREEN_COLOR)
);
} else if removal_info.removed_parquet_splits_entries.is_empty() {
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
} else {
println!(
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-index-management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
quickwit-indexing = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-parquet-engine = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-storage = { workspace = true }

Expand Down
120 changes: 80 additions & 40 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct GcMetrics {
pub failed_splits: IntCounter,
}

trait RecordGcMetrics {
pub(crate) trait RecordGcMetrics {
fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize);
}

Expand All @@ -72,7 +72,7 @@ pub struct DeleteSplitsError {
metastore_failures: Vec<SplitInfo>,
}

async fn protect_future<Fut, T>(progress: Option<&Progress>, future: Fut) -> T
pub(crate) async fn protect_future<Fut, T>(progress: Option<&Progress>, future: Fut) -> T
where Fut: Future<Output = T> {
match progress {
None => future.await,
Expand Down Expand Up @@ -289,7 +289,7 @@ async fn list_splits_metadata(

/// In order to avoid hammering the load on the metastore, we can throttle the rate of split
/// deletion by setting this environment variable.
fn get_maximum_split_deletion_rate_per_sec() -> Option<usize> {
pub(crate) fn get_maximum_split_deletion_rate_per_sec() -> Option<usize> {
static MAX_SPLIT_DELETION_RATE_PER_SEC: LazyLock<Option<usize>> = LazyLock::new(|| {
quickwit_common::get_from_env_opt::<usize>("QW_MAX_SPLIT_DELETION_RATE_PER_SEC", false)
});
Expand Down Expand Up @@ -408,6 +408,48 @@ async fn delete_splits_marked_for_deletion_several_indexes(
split_removal_info
}

/// A split normalized for storage deletion: just the id, path, and size.
/// Used as the common currency between tantivy and parquet GC paths.
pub(crate) struct SplitToDelete {
pub split_id: String,
pub path: PathBuf,
pub size_bytes: u64,
}

/// Deletes split files from storage and partitions into (succeeded, failed).
///
/// Returns the `BulkDeleteError` if there was a partial failure, so the caller
/// can log it with index-specific context. Does NOT touch the metastore.
pub(crate) async fn delete_split_files(
storage: &dyn Storage,
splits: Vec<SplitToDelete>,
progress_opt: Option<&Progress>,
) -> (
Vec<SplitToDelete>,
Vec<SplitToDelete>,
Option<BulkDeleteError>,
) {
if splits.is_empty() {
return (Vec::new(), Vec::new(), None);
}
let paths: Vec<&Path> = splits.iter().map(|s| s.path.as_path()).collect();
let result = protect_future(progress_opt, storage.bulk_delete(&paths)).await;

if let Some(progress) = progress_opt {
progress.record_progress();
}
match result {
Ok(()) => (splits, Vec::new(), None),
Err(bulk_err) => {
let success_paths: HashSet<&PathBuf> = bulk_err.successes.iter().collect();
let (succeeded, failed) = splits
.into_iter()
.partition(|s| success_paths.contains(&s.path));
(succeeded, failed, Some(bulk_err))
}
}
}

/// Delete a list of splits from the storage and the metastore.
/// It should leave the index and the metastore in good state.
///
Expand All @@ -424,49 +466,47 @@ pub async fn delete_splits_from_storage_and_metastore(
progress_opt: Option<&Progress>,
) -> Result<Vec<SplitInfo>, DeleteSplitsError> {
let mut split_infos: HashMap<PathBuf, SplitInfo> = HashMap::with_capacity(splits.len());

for split in splits {
let split_info = split.as_split_info();
split_infos.insert(split_info.file_name.clone(), split_info);
}
let split_paths = split_infos
.keys()
.map(|split_path_buf| split_path_buf.as_path())
.collect::<Vec<&Path>>();
let delete_result = protect_future(progress_opt, storage.bulk_delete(&split_paths)).await;

if let Some(progress) = progress_opt {
progress.record_progress();
}
let mut successes = Vec::with_capacity(split_infos.len());
let splits_to_delete: Vec<SplitToDelete> = split_infos
.values()
.map(|info| SplitToDelete {
split_id: info.split_id.clone(),
path: info.file_name.clone(),
size_bytes: info.file_size_bytes.as_u64(),
})
.collect();

let (succeeded_stds, failed_stds, storage_err) =
delete_split_files(&*storage, splits_to_delete, progress_opt).await;

let successes: Vec<SplitInfo> = succeeded_stds
.iter()
.map(|s| split_infos[&s.path].clone())
.collect();
let storage_failures: Vec<SplitInfo> = failed_stds
.iter()
.map(|s| split_infos[&s.path].clone())
.collect();

let mut storage_error: Option<BulkDeleteError> = None;
let mut storage_failures = Vec::new();

match delete_result {
Ok(_) => successes.extend(split_infos.into_values()),
Err(bulk_delete_error) => {
let success_split_paths: HashSet<&PathBuf> =
bulk_delete_error.successes.iter().collect();
for (split_path, split_info) in split_infos {
if success_split_paths.contains(&split_path) {
successes.push(split_info);
} else {
storage_failures.push(split_info);
}
}
let failed_split_paths = storage_failures
.iter()
.map(|split_info| split_info.file_name.as_path())
.collect::<Vec<_>>();
error!(
error=?bulk_delete_error.error,
index_id=index_uid.index_id,
"failed to delete split file(s) {:?} from storage",
PrettySample::new(&failed_split_paths, 5),
);
storage_error = Some(bulk_delete_error);
}
};
if let Some(bulk_delete_error) = storage_err {
let failed_split_paths = storage_failures
.iter()
.map(|split_info| split_info.file_name.as_path())
.collect::<Vec<_>>();
error!(
error=?bulk_delete_error.error,
index_id=index_uid.index_id,
"failed to delete split file(s) {:?} from storage",
PrettySample::new(&failed_split_paths, 5),
);
storage_error = Some(bulk_delete_error);
}

if !successes.is_empty() {
let split_ids: Vec<SplitId> = successes
.iter()
Expand Down
42 changes: 42 additions & 0 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::garbage_collection::{
DeleteSplitsError, SplitRemovalInfo, delete_splits_from_storage_and_metastore,
run_garbage_collect,
};
use crate::parquet_garbage_collection::{ParquetSplitRemovalInfo, run_parquet_garbage_collect};

#[derive(Error, Debug)]
pub enum IndexServiceError {
Expand Down Expand Up @@ -405,6 +406,47 @@ impl IndexService {
Ok(deleted_entries)
}

/// Detect all dangling parquet splits and associated files from a metrics index
/// and removes them.
///
/// * `index_id` - The target metrics index Id.
/// * `grace_period` - Threshold period after which a staged split can be garbage collected.
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
pub async fn garbage_collect_parquet_index(
&mut self,
index_id: &str,
grace_period: Duration,
dry_run: bool,
) -> anyhow::Result<ParquetSplitRemovalInfo> {
let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string());
let index_metadata = self
.metastore
.index_metadata(index_metadata_request)
.await?
.deserialize_index_metadata()?;
let index_uid = index_metadata.index_uid.clone();
let index_config = index_metadata.into_index_config();
let storage = self
.storage_resolver
.resolve(&index_config.index_uri)
.await?;

let deleted_entries = run_parquet_garbage_collect(
HashMap::from([(index_uid, storage)]),
self.metastore.clone(),
grace_period,
// deletion_grace_period of zero, so that a cli call directly deletes splits after
// marking to be deleted.
Duration::ZERO,
dry_run,
None,
None,
)
.await?;

Ok(deleted_entries)
}

/// Clears the index by applying the following actions:
/// - mark all splits for deletion in the metastore.
/// - delete the files of all splits marked for deletion using garbage collection.
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-index-management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

mod garbage_collection;
mod index;
mod parquet_garbage_collection;

pub use garbage_collection::{GcMetrics, run_garbage_collect};
pub use garbage_collection::{GcMetrics, SplitRemovalInfo, run_garbage_collect};
pub use index::{IndexService, IndexServiceError, clear_cache_directory, validate_storage_uri};
pub use parquet_garbage_collection::{
ParquetSplitInfo, ParquetSplitRemovalInfo, run_parquet_garbage_collect,
};
Loading
Loading