Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/bin/ampctl/src/cmd/manifest/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ fn generate_evm_rpc_manifest(
let manifest_table = ManifestTable {
schema,
network: network.clone(),
bloom_filter_columns: Vec::new(),
};
(table.name().to_string(), manifest_table)
})
Expand Down Expand Up @@ -204,6 +205,7 @@ fn generate_solana_manifest(
let manifest_table = ManifestTable {
schema,
network: network.clone(),
bloom_filter_columns: Vec::new(),
};
(table.name().to_string(), manifest_table)
})
Expand Down Expand Up @@ -234,6 +236,7 @@ fn generate_firehose_manifest(
let manifest_table = ManifestTable {
schema,
network: network.clone(),
bloom_filter_columns: Vec::new(),
};
(table.name().to_string(), manifest_table)
})
Expand Down Expand Up @@ -264,6 +267,7 @@ fn generate_tempo_manifest(
let manifest_table = ManifestTable {
schema,
network: network.clone(),
bloom_filter_columns: Vec::new(),
};
(table.name().to_string(), manifest_table)
})
Expand Down
5 changes: 0 additions & 5 deletions crates/config/src/worker_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ pub struct ParquetConfig {
/// Compression algorithm (default: `zstd(1)`).
#[serde(default)]
pub compression: Compression,
/// Enable Parquet bloom filters (default: false).
#[serde(default)]
pub bloom_filters: bool,
/// Parquet metadata cache size in MB (default: 1024).
#[serde(default = "default_cache_size_mb")]
pub cache_size_mb: u64,
Expand Down Expand Up @@ -48,7 +45,6 @@ impl Default for ParquetConfig {
fn default() -> Self {
Self {
compression: Compression::default(),
bloom_filters: false,
cache_size_mb: default_cache_size_mb(),
max_row_group_mb: default_max_row_group_mb(),
target_size: SizeLimitConfig::default(),
Expand All @@ -71,7 +67,6 @@ impl From<&ParquetConfig> for amp_worker_core::ParquetConfig {
fn from(config: &ParquetConfig) -> Self {
Self {
compression: (&config.compression).into(),
bloom_filters: config.bloom_filters,
cache_size_mb: config.cache_size_mb,
max_row_group_mb: config.max_row_group_mb,
target_size: (&config.target_size).into(),
Expand Down
9 changes: 8 additions & 1 deletion crates/core/datasets-common/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use downcast_rs::{DowncastSync, impl_downcast};

use crate::{
block_num::BlockNum, dataset_kind_str::DatasetKindStr, hash_reference::HashReference,
network_id::NetworkId, table_name::TableName,
manifest::BloomFilterColumnConfig, network_id::NetworkId, table_name::TableName,
};

/// Core trait representing a dataset definition.
Expand Down Expand Up @@ -95,6 +95,13 @@ pub trait Table: DowncastSync + std::fmt::Debug {

/// Returns column names by which this table is naturally sorted. Always includes `_block_num`.
fn sorted_by(&self) -> &BTreeSet<String>;

/// Returns the bloom filter column configurations for this table.
///
/// Empty by default — tables without bloom filter config get no bloom filters.
fn bloom_filter_columns(&self) -> &[BloomFilterColumnConfig] {
&[]
}
}

// Implement downcasting for `Table`.
Expand Down
19 changes: 19 additions & 0 deletions crates/core/datasets-common/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ impl Default for Schema {
}
}

/// Configuration for a bloom filter on a specific column.
///
/// Bloom filters enable efficient row-group pruning during queries by quickly
/// determining whether a row group could contain a specific value. The `ndv`
/// (number of distinct values) parameter tunes the filter's false-positive rate.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
pub struct BloomFilterColumnConfig {
/// Column name to apply the bloom filter to.
pub column: String,
/// Estimated number of distinct values for this column (default: 10,000).
#[serde(default = "default_bloom_filter_ndv")]
pub ndv: u64,
}

fn default_bloom_filter_ndv() -> u64 {
10_000
}

/// Apache Arrow data _new-type_ wrapper with JSON schema support.
///
/// This wrapper provides serialization and JSON schema generation capabilities
Expand Down
27 changes: 25 additions & 2 deletions crates/core/datasets-raw/src/dataset.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
//! Concrete raw dataset and table types.

use std::{collections::BTreeSet, sync::Arc};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};

use arrow::datatypes::SchemaRef;
use datasets_common::{
block_num::{BlockNum, RESERVED_BLOCK_NUM_COLUMN_NAME},
dataset::Table as TableTrait,
dataset_kind_str::DatasetKindStr,
hash_reference::HashReference,
manifest::BloomFilterColumnConfig,
network_id::NetworkId,
table_name::TableName,
};
Expand Down Expand Up @@ -37,7 +41,8 @@ impl Dataset {
reference: HashReference,
kind: DatasetKindStr,
network: NetworkId,
tables: Vec<Table>,
mut tables: Vec<Table>,
manifest_tables: &BTreeMap<String, crate::manifest::Table>,
start_block: Option<BlockNum>,
finalized_blocks_only: bool,
) -> Self {
Expand All @@ -50,6 +55,13 @@ impl Dataset {
"all tables must belong to the same network as the dataset"
);

// Apply per-table bloom filter config from the manifest.
for table in &mut tables {
if let Some(mt) = manifest_tables.get(table.name().as_str()) {
table.set_bloom_filter_columns(mt.bloom_filter_columns.clone());
}
}

let tables: Vec<Arc<dyn TableTrait>> = tables
.into_iter()
.map(|t| Arc::new(t) as Arc<dyn TableTrait>)
Expand Down Expand Up @@ -111,6 +123,7 @@ pub struct Table {
schema: SchemaRef,
network: NetworkId,
sorted_by: BTreeSet<String>,
bloom_filter_columns: Vec<BloomFilterColumnConfig>,
}

impl Table {
Expand All @@ -128,13 +141,19 @@ impl Table {
schema,
network,
sorted_by,
bloom_filter_columns: Vec::new(),
}
}

/// Returns the network this raw table belongs to.
pub fn network_ref(&self) -> &NetworkId {
&self.network
}

/// Sets the bloom filter column configurations for this table.
pub fn set_bloom_filter_columns(&mut self, columns: Vec<BloomFilterColumnConfig>) {
self.bloom_filter_columns = columns;
}
}

impl TableTrait for Table {
Expand All @@ -153,4 +172,8 @@ impl TableTrait for Table {
fn sorted_by(&self) -> &BTreeSet<String> {
&self.sorted_by
}

fn bloom_filter_columns(&self) -> &[BloomFilterColumnConfig] {
&self.bloom_filter_columns
}
}
10 changes: 9 additions & 1 deletion crates/core/datasets-raw/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

use std::collections::BTreeMap;

use datasets_common::{block_num::BlockNum, manifest::TableSchema, network_id::NetworkId};
use datasets_common::{
block_num::BlockNum,
manifest::{BloomFilterColumnConfig, TableSchema},
network_id::NetworkId,
};

use crate::dataset_kind::{
EvmRpcDatasetKind, FirehoseDatasetKind, SolanaDatasetKind, TempoDatasetKind,
Expand Down Expand Up @@ -57,6 +61,10 @@ pub struct Table {
pub schema: TableSchema,
/// Network for this table.
pub network: NetworkId,
/// Columns to apply bloom filters to for row-group pruning.
/// Omit to disable bloom filters for this table.
#[serde(default)]
pub bloom_filter_columns: Vec<BloomFilterColumnConfig>,
Copy link
Contributor

@LNSD LNSD Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should validate that the columns in the vector match those in the table schema at manifest registration time

}

/// Schema generation types, gated behind the `schemars` feature.
Expand Down
3 changes: 0 additions & 3 deletions crates/core/worker-core/src/compaction/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use super::algorithm::Overflow;
pub struct ParquetConfig {
/// Compression algorithm: zstd, lz4, gzip, brotli, snappy, uncompressed (default: zstd(1))
pub compression: Compression,
/// Enable bloom filters (default: false)
pub bloom_filters: bool,
/// Parquet metadata cache size in MB (default: 1024)
pub cache_size_mb: u64,
/// Max row group size in MB (default: 512)
Expand All @@ -26,7 +24,6 @@ impl Default for ParquetConfig {
fn default() -> Self {
Self {
compression: Compression::ZSTD(ZstdLevel::default()),
bloom_filters: false,
cache_size_mb: 1024, // 1 GB
max_row_group_mb: 512, // 512 MB
target_size: SizeLimitConfig::default(),
Expand Down
37 changes: 27 additions & 10 deletions crates/core/worker-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::{sync::Arc, time::Duration};

use datafusion::parquet::file::properties::WriterProperties as ParquetWriterProperties;
use datasets_common::manifest::BloomFilterColumnConfig;

pub mod block_ranges;
pub mod check;
Expand Down Expand Up @@ -51,23 +52,14 @@ pub struct WriterProperties {

pub fn parquet_opts(config: impl Into<ParquetConfig>) -> Arc<WriterProperties> {
let config = config.into();
// We have not done our own benchmarking, but the default 1_000_000 value for this adds about a
// megabyte of storage per column, per row group. This analysis by InfluxData suggests that
// smaller NDV values may be equally effective:
// https://www.influxdata.com/blog/using-parquets-bloom-filters/
let bloom_filter_ndv = 10_000;

// For DataFusion defaults, see `ParquetOptions` here:
// https://github.com/apache/arrow-datafusion/blob/main/datafusion/common/src/config.rs
//
// Note: We could set `sorting_columns` for columns like `block_num` and `ordinal`. However,
// Datafusion doesn't actually read that metadata info anywhere and just reiles on the
// `file_sort_order` set on the reader configuration.
let parquet = ParquetWriterProperties::builder()
.set_compression(config.compression)
.set_bloom_filter_ndv(bloom_filter_ndv)
.set_bloom_filter_enabled(config.bloom_filters)
.build();
let parquet = build_parquet_writer_properties(config.compression, &[]);

let collector = CollectorProperties::from(&config);
let compactor = CompactorProperties::from(&config);
Expand All @@ -88,3 +80,28 @@ pub fn parquet_opts(config: impl Into<ParquetConfig>) -> Arc<WriterProperties> {
}
.into()
}

/// Builds `ParquetWriterProperties` with per-column bloom filter configuration.
///
/// When `bloom_filter_columns` is non-empty, bloom filters are enabled only on the
/// specified columns with their configured NDV values. When empty, no bloom filters
/// are written.
pub fn build_parquet_writer_properties(
compression: datafusion::parquet::basic::Compression,
bloom_filter_columns: &[BloomFilterColumnConfig],
) -> ParquetWriterProperties {
let mut builder = ParquetWriterProperties::builder().set_compression(compression);

if bloom_filter_columns.is_empty() {
builder.build()
} else {
// Disable bloom filters globally, then enable per-column
builder = builder.set_bloom_filter_enabled(false);
for col in bloom_filter_columns {
builder = builder
.set_column_bloom_filter_enabled(col.column.clone().into(), true)
.set_column_bloom_filter_ndv(col.column.clone().into(), col.ndv);
}
builder.build()
}
}
25 changes: 21 additions & 4 deletions crates/core/worker-datasets-raw/src/job_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc, time::Instant};
use amp_data_store::retryable::RetryableErrorExt as _;
use amp_providers_registry::retryable::RetryableErrorExt as _;
use amp_worker_core::{
ResolvedEndBlock,
ResolvedEndBlock, WriterProperties,
block_ranges::resolve_end_block,
check::consistency_check,
compaction::AmpCompactor,
Expand Down Expand Up @@ -142,11 +142,28 @@ pub async fn execute(
let dataset_reference = dataset.reference();

let materialize_start_time = Instant::now();
let parquet_opts = amp_worker_core::parquet_opts(ctx.config.parquet_writer.clone());
let global_parquet_opts = amp_worker_core::parquet_opts(ctx.config.parquet_writer.clone());

// Initialize physical tables and compactors
let mut tables: Vec<(Arc<PhysicalTable>, Arc<AmpCompactor>)> = vec![];
let mut per_table_opts: BTreeMap<TableName, Arc<WriterProperties>> = BTreeMap::new();
for table_def in dataset.tables() {
// Build per-table WriterProperties with table-specific bloom filter columns
let bloom_filter_columns = table_def.bloom_filter_columns();
let table_opts = if bloom_filter_columns.is_empty() {
global_parquet_opts.clone()
} else {
let table_parquet = amp_worker_core::build_parquet_writer_properties(
ctx.config.parquet_writer.compression,
bloom_filter_columns,
);
Arc::new(WriterProperties {
parquet: table_parquet,
..(*global_parquet_opts).clone()
})
};
per_table_opts.insert(table_def.name().clone(), table_opts.clone());

// Try to get existing active physical table (handles retry case)
let revision = match ctx
.data_store
Expand Down Expand Up @@ -176,7 +193,7 @@ pub async fn execute(
let compactor = AmpCompactor::start(
ctx.metadata_db.clone(),
ctx.data_store.clone(),
parquet_opts.clone(),
table_opts,
physical_table.clone(),
ctx.metrics.clone(),
)
Expand Down Expand Up @@ -367,7 +384,7 @@ pub async fn execute(
&client,
&ctx,
&catalog,
parquet_opts.clone(),
per_table_opts.clone(),
missing_ranges_by_table,
compactors_by_table,
&tables,
Expand Down
10 changes: 5 additions & 5 deletions crates/core/worker-datasets-raw/src/job_impl/ranges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub(super) async fn materialize_ranges<S: BlockStreamer + Send + Sync>(
client: &S,
ctx: &Context,
catalog: &Catalog,
parquet_opts: Arc<WriterProperties>,
per_table_opts: BTreeMap<TableName, Arc<WriterProperties>>,
missing_ranges_by_table: BTreeMap<TableName, Vec<RangeInclusive<BlockNum>>>,
compactors_by_table: BTreeMap<TableName, Arc<AmpCompactor>>,
tables: &[(Arc<PhysicalTable>, Arc<AmpCompactor>)],
Expand Down Expand Up @@ -113,7 +113,7 @@ pub(super) async fn materialize_ranges<S: BlockStreamer + Send + Sync>(
data_store: ctx.data_store.clone(),
catalog: catalog.clone(),
ranges,
parquet_opts: parquet_opts.clone(),
per_table_opts: per_table_opts.clone(),
missing_ranges_by_table: missing_ranges_by_table.clone(),
compactors_by_table: compactors_by_table.clone(),
id: i as u32,
Expand Down Expand Up @@ -418,8 +418,8 @@ struct MaterializePartition<S: BlockStreamer> {
catalog: Catalog,
/// The block ranges to scan
ranges: Vec<RangeInclusive<BlockNum>>,
/// The Parquet writer properties
parquet_opts: Arc<WriterProperties>,
/// Per-table Parquet writer properties
per_table_opts: BTreeMap<TableName, Arc<WriterProperties>>,
/// The missing block ranges by table
missing_ranges_by_table: BTreeMap<TableName, Vec<RangeInclusive<BlockNum>>>,
/// The compactors for each table
Expand Down Expand Up @@ -505,7 +505,7 @@ impl<S: BlockStreamer> MaterializePartition<S> {
self.catalog.clone(),
self.metadata_db.clone(),
self.data_store.clone(),
self.parquet_opts.clone(),
self.per_table_opts.clone(),
missing_ranges_by_table,
self.compactors_by_table.clone(),
self.metrics.clone(),
Expand Down
Loading
Loading