Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f6be2fb
feat: per-session Python UDF inlining toggle + sender ctx + strict re…
timsaucer May 15, 2026
14178db
update uv lock
timsaucer May 21, 2026
eb3c194
docs: clarify Python UDF inlining docstring; drop unresolved :doc: refs
timsaucer May 21, 2026
7dd4252
docs: add doctest examples for sender ctx + UDF inlining toggle
timsaucer May 21, 2026
a24daf6
refactor: address review nits for UDF inlining toggle + sender ctx
timsaucer May 21, 2026
74405d8
refactor: keyword-only inlining flag, skip GIL on prefix mismatch
timsaucer May 21, 2026
e8413dd
Add dev dependency
timsaucer May 21, 2026
d00c619
Add testing for CI failure
timsaucer May 21, 2026
f7c8c6c
Additional debugging for mp tests in CI
timsaucer May 21, 2026
949af91
Set path for workers
timsaucer May 22, 2026
050c7f2
more path updates for unit tests
timsaucer May 22, 2026
0fc78b7
test(pickle): remove multiprocessing CI debug instrumentation
timsaucer May 22, 2026
8827726
Shorten rust side docstring since it's duplicative of the exposed pyt…
timsaucer May 22, 2026
bcc4755
docs: clarify strict-mode refusal message and to_bytes inlining docs
timsaucer May 22, 2026
16756f1
docs: clarify with_python_udf_inlining enabled arg is required
timsaucer May 26, 2026
e02357b
docs: demonstrate strict-mode refusal in with_python_udf_inlining doc…
timsaucer May 26, 2026
00d333b
docs: convert sender-ctx example to executable doctest
timsaucer May 26, 2026
a04ec5c
docs: use 'thread-local sender context' as adjectival phrase
timsaucer May 26, 2026
a41bf17
docs: drop trailing clear_sender_ctx from set_sender_ctx example
timsaucer May 26, 2026
4665983
Merge branch 'main' into pr3-toggle-sender-strict
timsaucer May 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ env:
jobs:
test-matrix:
runs-on: ubuntu-latest
# Backstop: a hung multiprocessing worker (e.g. during a pickle regression)
# should not block CI longer than this.
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
Expand Down
157 changes: 137 additions & 20 deletions crates/core/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,39 @@ fn strip_wire_header<'a>(
#[derive(Debug)]
pub struct PythonLogicalCodec {
inner: Arc<dyn LogicalExtensionCodec>,
python_udf_inlining: bool,
Comment thread
timsaucer marked this conversation as resolved.
}

impl PythonLogicalCodec {
pub fn new(inner: Arc<dyn LogicalExtensionCodec>) -> Self {
Self { inner }
Self {
inner,
python_udf_inlining: true,
}
}

pub fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> {
&self.inner
}

/// Toggle inline encoding of Python UDFs. See
/// `SessionContext.with_python_udf_inlining` (Python) for full
/// behavior and use cases.
///
/// Security scope: strict mode (`false`) narrows only the codec
/// layer — it stops `Expr::from_bytes` from invoking
/// `cloudpickle.loads` on the inline `DFPY*` payload. It does
/// **not** make `pickle.loads(untrusted_bytes)` safe; treat every
/// `pickle.loads` on untrusted input as unsafe regardless of this
/// setting.
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
self.python_udf_inlining = enabled;
self
}

pub fn python_udf_inlining(&self) -> bool {
self.python_udf_inlining
}
}

impl Default for PythonLogicalCodec {
Expand Down Expand Up @@ -301,48 +324,104 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
}

fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_scalar_udf(node, buf)? {
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udf(node, buf)
}

fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
return Ok(udf);
if self.python_udf_inlining {
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
return Ok(udf);
}
} else {
refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?;
}
self.inner.try_decode_udf(name, buf)
}

fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_udaf(node, buf)? {
if self.python_udf_inlining && try_encode_python_udaf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udaf(node, buf)
}

fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
if let Some(udaf) = try_decode_python_udaf(buf)? {
return Ok(udaf);
if self.python_udf_inlining {
if let Some(udaf) = try_decode_python_udaf(buf)? {
return Ok(udaf);
}
} else {
refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?;
}
self.inner.try_decode_udaf(name, buf)
}

fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_udwf(node, buf)? {
if self.python_udf_inlining && try_encode_python_udwf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udwf(node, buf)
}

fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
if let Some(udwf) = try_decode_python_udwf(buf)? {
return Ok(udwf);
if self.python_udf_inlining {
if let Some(udwf) = try_decode_python_udwf(buf)? {
return Ok(udwf);
}
} else {
refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?;
}
self.inner.try_decode_udwf(name, buf)
}
}

/// Strict-mode gate: if `buf` is a well-framed inline payload for
/// `family`, return the strict-refusal error; otherwise return
/// `Ok(())` so the caller can delegate to its `inner` codec.
///
/// Routing through [`read_framed_payload`] (rather than a bare
/// `starts_with` probe) means malformed inline bytes — wrong
/// wire-format version, mismatched Python version, truncated header —
/// surface *their* diagnostic instead of the strict-mode message.
/// The strict message implies sender intent ("inlining is disabled"),
/// so it should fire only when the bytes really would have decoded.
///
/// Fast path: short-circuit on the family-magic prefix before
/// acquiring the GIL. Plans with many non-Python UDFs would otherwise
/// pay a GIL acquisition per decode call just to confirm "not a
/// Python UDF". `read_framed_payload` itself rejects buffers that
/// don't start with `family`, so this is purely an optimization.
fn refuse_if_inline(buf: &[u8], family: &[u8], kind: &str, name: &str) -> Result<()> {
if !buf.starts_with(family) {
return Ok(());
}
Python::attach(|py| match read_framed_payload(py, buf, family, kind)? {
Some(_) => Err(refuse_inline_payload(kind, name)),
None => Ok(()),
})
}

/// Build the error returned by a strict codec when it receives an
/// inline Python-UDF payload it has been told not to deserialize.
fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusionError {
// `Execution`, not `Plan`: this is a wire-format decode refusal at
// codec time, not a planner-stage failure. Downstream error
// classification keys off the variant — surfacing this as a planner
// error would mis-route it into "fix your SQL" buckets.
datafusion::error::DataFusionError::Execution(format!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if there was a page on this so we could include a url with even more context. I think your descriptions in the previous two prs are good but I suspect someone will stumble upon this with very little context as a general user and thinking about how to mitigate that. Can be wrapped in next PR or even a follow on for further nits/clarifiaction.

"Refusing to deserialize inline Python {kind} '{name}': Python UDF \
inlining is disabled on this session. Two remediations: \
(1) ask the sender to re-encode with inlining disabled so '{name}' \
travels by name, and register '{name}' on this receiver; or \
(2) enable inlining on this receiver (accepts the cloudpickle \
execution risk on inbound payloads). Receivers cannot re-encode \
bytes they did not produce."
))
}

/// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`] parked
/// on the same `SessionContext`. Carries the Python-aware encoding
/// hooks for physical-layer types (`ExecutionPlan`, `PhysicalExpr`)
Expand All @@ -358,16 +437,33 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
#[derive(Debug)]
pub struct PythonPhysicalCodec {
inner: Arc<dyn PhysicalExtensionCodec>,
python_udf_inlining: bool,
}

impl PythonPhysicalCodec {
pub fn new(inner: Arc<dyn PhysicalExtensionCodec>) -> Self {
Self { inner }
Self {
inner,
python_udf_inlining: true,
}
}

pub fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
&self.inner
}

/// Toggle inline encoding of Python UDFs on this physical codec.
///
/// Mirrors [`PythonLogicalCodec::with_python_udf_inlining`]; see
/// that method for the full security and portability discussion.
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
self.python_udf_inlining = enabled;
self
}

pub fn python_udf_inlining(&self) -> bool {
self.python_udf_inlining
}
}

impl Default for PythonPhysicalCodec {
Expand All @@ -391,15 +487,19 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
}

fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_scalar_udf(node, buf)? {
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udf(node, buf)
}

fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
return Ok(udf);
if self.python_udf_inlining {
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
return Ok(udf);
}
} else {
refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?;
}
self.inner.try_decode_udf(name, buf)
}
Expand All @@ -417,29 +517,37 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
}

fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_udaf(node, buf)? {
if self.python_udf_inlining && try_encode_python_udaf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udaf(node, buf)
}

fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
if let Some(udaf) = try_decode_python_udaf(buf)? {
return Ok(udaf);
if self.python_udf_inlining {
if let Some(udaf) = try_decode_python_udaf(buf)? {
return Ok(udaf);
}
} else {
refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?;
}
self.inner.try_decode_udaf(name, buf)
}

fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
if try_encode_python_udwf(node, buf)? {
if self.python_udf_inlining && try_encode_python_udwf(node, buf)? {
return Ok(());
}
self.inner.try_encode_udwf(node, buf)
}

fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
if let Some(udwf) = try_decode_python_udwf(buf)? {
return Ok(udwf);
if self.python_udf_inlining {
if let Some(udwf) = try_decode_python_udwf(buf)? {
return Ok(udwf);
}
} else {
refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?;
}
self.inner.try_decode_udwf(name, buf)
}
Expand Down Expand Up @@ -476,6 +584,9 @@ pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec<u8>)
/// the caller to delegate to its `inner` codec (and eventually the
/// `FunctionRegistry`).
pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result<Option<Arc<ScalarUDF>>> {
if !buf.starts_with(PY_SCALAR_UDF_FAMILY) {
return Ok(None);
}
Python::attach(|py| -> Result<Option<Arc<ScalarUDF>>> {
let Some(payload) = read_framed_payload(py, buf, PY_SCALAR_UDF_FAMILY, "scalar UDF")?
else {
Expand Down Expand Up @@ -732,6 +843,9 @@ pub(crate) fn try_encode_python_udwf(node: &WindowUDF, buf: &mut Vec<u8>) -> Res
}

pub(crate) fn try_decode_python_udwf(buf: &[u8]) -> Result<Option<Arc<WindowUDF>>> {
if !buf.starts_with(PY_WINDOW_UDF_FAMILY) {
return Ok(None);
}
Python::attach(|py| -> Result<Option<Arc<WindowUDF>>> {
let Some(payload) = read_framed_payload(py, buf, PY_WINDOW_UDF_FAMILY, "window UDF")?
else {
Expand Down Expand Up @@ -814,6 +928,9 @@ pub(crate) fn try_encode_python_udaf(node: &AggregateUDF, buf: &mut Vec<u8>) ->
}

pub(crate) fn try_decode_python_udaf(buf: &[u8]) -> Result<Option<Arc<AggregateUDF>>> {
if !buf.starts_with(PY_AGG_UDF_FAMILY) {
return Ok(None);
}
Python::attach(|py| -> Result<Option<Arc<AggregateUDF>>> {
let Some(payload) = read_framed_payload(py, buf, PY_AGG_UDF_FAMILY, "aggregate UDF")?
else {
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,22 @@ impl PySessionContext {
physical_codec,
})
}

pub fn with_python_udf_inlining(&self, enabled: bool) -> Self {
let logical_codec = Arc::new(
PythonLogicalCodec::new(Arc::clone(self.logical_codec.inner()))
.with_python_udf_inlining(enabled),
);
let physical_codec = Arc::new(
PythonPhysicalCodec::new(Arc::clone(self.physical_codec.inner()))
.with_python_udf_inlining(enabled),
);
Self {
ctx: Arc::clone(&self.ctx),
logical_codec,
physical_codec,
}
}
}

impl PySessionContext {
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ dev = [
"pyarrow>=19.0.0",
"pygithub==2.5.0",
"pytest-asyncio>=0.23.3",
"pytest-timeout>=2.3.1",
"pytest>=7.4.4",
"pyyaml>=6.0.3",
"ruff>=0.15.1",
Expand Down
58 changes: 58 additions & 0 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1963,3 +1963,61 @@ def with_physical_extension_codec(
new = SessionContext.__new__(SessionContext)
new.ctx = new_internal
return new

def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext:
"""Control whether Python UDFs are embedded in serialized expressions.

``enabled`` is keyword-only and required: callers must pick a
mode explicitly. Fresh sessions inline UDFs (``enabled=True``
behavior) until this method overrides the toggle.

With ``enabled=True``, serialized expressions carry the Python
code for any scalar, aggregate, or window UDFs they reference.
The receiver rebuilds the UDFs from those bytes and does not
need to register them first.

With ``enabled=False``, serialized expressions store only the
UDF names. This has two uses:

* **Cross-language portability.** The bytes can be decoded by a
non-Python receiver, which must already have UDFs registered
under matching names.
* **Safer deserialization.** :meth:`Expr.from_bytes` will refuse
to rebuild Python UDFs rather than call ``cloudpickle.loads``
on untrusted input.

The setting affects :meth:`Expr.to_bytes` and
:meth:`Expr.from_bytes` whenever this session is passed as the
``ctx`` argument. :func:`pickle.dumps` and :func:`pickle.loads`
do not pass a context, so to apply the setting through pickle,
register this session with
:func:`datafusion.ipc.set_sender_ctx` on the sender and
:func:`datafusion.ipc.set_worker_ctx` on the receiver.

.. warning:: Security
This setting narrows only :meth:`Expr.from_bytes`. Calling
:func:`pickle.loads` on untrusted bytes remains unsafe
regardless of the toggle.

Returns a new :class:`SessionContext` with the toggle applied;
the original session is unchanged.

Examples:
>>> import pyarrow as pa
>>> from datafusion import SessionContext, Expr, col, udf
>>> ctx = SessionContext()
>>> identity = udf(lambda a: a, [pa.int64()], pa.int64(),
... volatility="immutable", name="identity_demo")
>>> ctx.register_udf(identity)
>>> blob = identity(col("x")).to_bytes(ctx)
>>> strict = SessionContext().with_python_udf_inlining(enabled=False)
>>> try:
... Expr.from_bytes(blob, strict)
... except Exception as e:
... print("Refusing to deserialize" in str(e))
True
"""
new_internal = self.ctx.with_python_udf_inlining(enabled)
new = SessionContext.__new__(SessionContext)
new.ctx = new_internal
return new
Loading
Loading