diff --git a/datafusion/physical-expr/src/expressions/lambda.rs b/datafusion/physical-expr/src/expressions/lambda.rs index 5e6dca1a62667..9275821ae9150 100644 --- a/datafusion/physical-expr/src/expressions/lambda.rs +++ b/datafusion/physical-expr/src/expressions/lambda.rs @@ -21,6 +21,7 @@ use std::hash::Hash; use std::sync::Arc; use crate::{ + ScalarFunctionExpr, expressions::{Column, LambdaVariable}, physical_expr::PhysicalExpr, }; @@ -61,11 +62,16 @@ impl Hash for LambdaExpr { impl LambdaExpr { /// Create a new lambda expression with the given parameters and body pub fn try_new(params: Vec, body: Arc) -> Result { - if all_unique(¶ms) { - Ok(Self::new(params, body)) - } else { - plan_err!("lambda params must be unique, got ({})", params.join(", ")) + if !all_unique(¶ms) { + return plan_err!( + "lambda params must be unique, got ({})", + params.join(", ") + ); } + + check_async_udf(&body)?; + + Ok(Self::new(params, body)) } fn new(params: Vec, body: Arc) -> Self { @@ -179,6 +185,8 @@ impl PhysicalExpr for LambdaExpr { ); }; + check_async_udf(body)?; + Ok(Arc::new(Self::new(self.params.clone(), Arc::clone(body)))) } @@ -210,6 +218,20 @@ fn all_unique(params: &[String]) -> bool { } } +fn check_async_udf(body: &Arc) -> Result<()> { + if body.exists(|expr| { + Ok(expr + .downcast_ref::() + .is_some_and(|udf| udf.fun().as_async().is_some())) + })? { + return plan_err!( + "Async functions in lambdas aren't supported, see https://github.com/apache/datafusion/issues/22091" + ); + } + + Ok(()) +} + #[cfg(test)] mod tests { use crate::expressions::{NoOp, lambda::lambda}; diff --git a/datafusion/sqllogictest/test_files/async_udf.slt b/datafusion/sqllogictest/test_files/async_udf.slt index 0708b59e519a0..683c2a13e6328 100644 --- a/datafusion/sqllogictest/test_files/async_udf.slt +++ b/datafusion/sqllogictest/test_files/async_udf.slt @@ -99,3 +99,9 @@ physical_plan 01)ProjectionExec: expr=[__async_fn_0@1 as async_abs(data.x)] 02)--AsyncFuncExec: async_expr=[async_expr(name=__async_fn_0, expr=async_abs(x@0))] 03)----DataSourceExec: partitions=1, partition_sizes=[1] + +# Async udf can't be used in lambdas +query error +select array_transform([1], v -> async_abs(v)); +---- +DataFusion error: Error during planning: Async functions in lambdas aren't supported, see https://github.com/apache/datafusion/issues/22091