perf: Optimize some decimal expressions#3619
Conversation
Replace the 4-node expression tree (Cast→BinaryExpr→Cast→Cast) used for Decimal128 arithmetic that may overflow with a single fused expression that performs i256 register arithmetic directly. This reduces per-batch allocation from 4 intermediate arrays (112 bytes/elem) to 1 output array (16 bytes/elem). The new WideDecimalBinaryExpr evaluates children, performs add/sub/mul using i256 intermediates via try_binary, applies scale adjustment with HALF_UP rounding, checks precision bounds, and outputs a single Decimal128 array. Follows the same pattern as decimal_div.
Add benchmark comparing old Cast->BinaryExpr->Cast chain vs fused WideDecimalBinaryExpr for Decimal128 add/sub/mul. Covers four cases: add with same scale, add with different scales, multiply, and subtract.
cb52636 to
d7495bd
Compare
|
@sqlbenchmark run tpch --iterations 3 |
1 similar comment
|
@sqlbenchmark run tpch --iterations 3 |
Eliminate redundant CheckOverflow when wrapping WideDecimalBinaryExpr (which already handles overflow). Fuse Cast(Decimal128→Decimal128) + CheckOverflow into a single DecimalRescaleCheckOverflow expression that rescales and validates precision in one pass.
5a21500 to
91092a6
Compare
|
@sqlbenchmark run tpch --iterations 3 |
|
Benchmark job |
|
This is awesome ! |
| let rescale_divisor = if need_rescale { | ||
| i256_pow10((max_scale - s_out) as u32) | ||
| } else { | ||
| i256::ONE |
There was a problem hiding this comment.
Don't we need to scale up to s_out if s_out > max_scale?
| ColumnarValue::Scalar(ScalarValue::Decimal128(v, _precision, _scale)) => { | ||
| let new_v = v.and_then(|val| { | ||
| rescale_and_check(val, delta, scale_factor, bound, fail_on_error) | ||
| .ok() |
There was a problem hiding this comment.
If fail_on_error=true then this .ok() will convert it to None and hide the error
|
|
||
| fn evaluate(&self, batch: &RecordBatch) -> datafusion::common::Result<ColumnarValue> { | ||
| let arg = self.child.evaluate(batch)?; | ||
| let delta = self.output_scale - self.input_scale; |
There was a problem hiding this comment.
The scales could be negative.
Let's say output_scale=38 ( the maximum supported one) and input_scale=-1, the delta=39.
This will lead to an error below at 10i128.pow(abs_delta as u32);
| /// Maximum absolute value for a given decimal precision: 10^p - 1. | ||
| #[inline] | ||
| fn precision_bound(precision: u8) -> i128 { | ||
| 10i128.pow(precision as u32) - 1 |
There was a problem hiding this comment.
maybe add some validation that precision is <=38 ?
or use checked_pow() and return a Result
|
|
||
| /// Compute 10^exp as i256. | ||
| #[inline] | ||
| fn i256_pow10(exp: u32) -> i256 { |
There was a problem hiding this comment.
extreme case: exp>77 will overflow
| DataFusionOperator::Plus | DataFusionOperator::Minus | DataFusionOperator::Multiply, | ||
| Ok(DataType::Decimal128(p1, s1)), | ||
| Ok(DataType::Decimal128(p2, s2)), | ||
| Ok(DataType::Decimal128(_p1, _s1)), |
There was a problem hiding this comment.
What is the reason to use _ prefix ? The variables are used
| // divisor = 10^(-delta), half = divisor / 2 | ||
| let divisor = scale_factor; // already 10^abs(delta) | ||
| let half = divisor / 2; | ||
| let sign = if value < 0 { -1i128 } else { 1i128 }; |
There was a problem hiding this comment.
| let sign = if value < 0 { -1i128 } else { 1i128 }; | |
| let sign = value.signum(); |
| } | ||
|
|
||
| // Fuse Cast(Decimal128→Decimal128) + CheckOverflow into single rescale+check | ||
| if let Some(cast) = child.as_any().downcast_ref::<Cast>() { |
There was a problem hiding this comment.
Should this also check that the Cast's precision/scale match the output's p/s before fusing ?
| fn with_new_children( | ||
| self: Arc<Self>, | ||
| children: Vec<Arc<dyn PhysicalExpr>>, | ||
| ) -> datafusion::common::Result<Arc<dyn PhysicalExpr>> { |
There was a problem hiding this comment.
| ) -> datafusion::common::Result<Arc<dyn PhysicalExpr>> { | |
| ) -> datafusion::common::Result<Arc<dyn PhysicalExpr>> { | |
| if children.len() != 1 { | |
| return Err(DataFusionError::Internal(format!( | |
| "DecimalRescaleCheckOverflow expects 1 child, got {}", | |
| children.len() | |
| ))); | |
| } |
| fn with_new_children( | ||
| self: Arc<Self>, | ||
| children: Vec<Arc<dyn PhysicalExpr>>, | ||
| ) -> Result<Arc<dyn PhysicalExpr>> { |
There was a problem hiding this comment.
| ) -> Result<Arc<dyn PhysicalExpr>> { | |
| ) -> Result<Arc<dyn PhysicalExpr>> { | |
| if children.len() != 2 { | |
| return Err(DataFusionError::Internal(format!( | |
| "WideDecimalBinaryExpr expects 2 children, got {}", | |
| children.len() | |
| ))); | |
| } |
Summary
For some decimal operations, we cast the inputs to Decimal256, perform the math operation, and then cast the result back to Decimal128:
This causes intermediate allocations and can be optimized with a custom
WideDecimalBinaryExprthat performs i256 register arithmetic directly, reducing per-batch allocation from 4 intermediate arrays (3 Decimal256 @ 32 bytes/elem + 1 Decimal128 @ 16 bytes/elem = 112 bytes/elem) to 1 output array (16 bytes/elem)TPC-H q1
Before:
After:
Criterion benchmark results (8192 element batches)
How it works
WideDecimalBinaryExprevaluates left/right children, performs add/sub/mul usingi256intermediates viaarrow::compute::kernels::arity::try_binary, applies scale adjustment with HALF_UP rounding, checks precision bounds, and outputs a singleDecimal128array. Follows the same pattern asdecimal_divindiv.rs.Overflow handling matches existing behavior:
ArrowError::ComputeErrori128::MAXsentinel +null_if_overflow_precision