Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3374,3 +3374,42 @@ fn test_filter_pushdown_through_sort_with_projection() {
"
);
}

/// `FilterPushdown::new_post_optimization()` must be idempotent. When applied
/// to a HashJoinExec, the rule installs a dynamic filter on the probe-side
/// scan; before the fix in `HashJoinExec::gather_filters_for_pushdown`, every
/// invocation created a *new* `DynamicFilterPhysicalExpr` and ANDed it onto
/// the probe side's existing predicate, producing
/// `DynamicFilter AND DynamicFilter AND ...` after N passes.
///
/// AQE (datafusion-ballista#1359) re-runs the optimizer chain after every
/// completed stage, so this would compound indefinitely without the guard.
#[test]
fn post_phase_is_idempotent_on_hash_join() {
use crate::physical_optimizer::test_utils::{hash_join_exec, parquet_exec, schema};
use datafusion_common::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_plan::get_plan_string;
use datafusion_physical_plan::joins::utils::JoinOn;

let s = schema();
let left = parquet_exec(Arc::clone(&s));
let right = parquet_exec(Arc::clone(&s));
let join_on: JoinOn = vec![(
Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()),
Arc::new(Column::new_with_schema("a", &right.schema()).unwrap()),
)];
let plan = hash_join_exec(left, right, join_on, None, &JoinType::Inner).unwrap();

let config = ConfigOptions::new();
let rule = FilterPushdown::new_post_optimization();
let once = rule.optimize(plan, &config).unwrap();
let twice = rule.optimize(Arc::clone(&once), &config).unwrap();

assert_eq!(
get_plan_string(&once),
get_plan_string(&twice),
"second invocation of FilterPushdown::new_post_optimization mutated the plan",
);
}
8 changes: 7 additions & 1 deletion datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1640,8 +1640,14 @@ impl ExecutionPlan for HashJoinExec {
ChildFilterDescription::all_unsupported(&parent_filters)
};

// Add dynamic filters in Post phase if enabled
// Add dynamic filters in Post phase if enabled. Skip when this join
// already carries a dynamic filter from a previous pass — the shared
// `Arc<DynamicFilterPhysicalExpr>` is still wired into the probe-side
// scan's predicate, and re-creating it would AND a fresh duplicate
// onto every Post-phase invocation (apache/datafusion-ballista#1359
// surfaces this in AQE replan loops).
if phase == FilterPushdownPhase::Post
&& self.dynamic_filter.is_none()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately i dont know much about this peace of code, this might provide correct result but i'm not sure if this is correct way to do it

&& self.allow_join_dynamic_filter_pushdown(config)
{
// Add actual dynamic filter to right side (probe side)
Expand Down
Loading