feat: enable pickling of most Expr except udaf and udwf#1544
Conversation
Adds Python-aware encoding to PythonLogicalCodec/PythonPhysicalCodec so a ScalarUDF defined in Python travels inside the serialized expression (cloudpickled into fun_definition) instead of needing a matching registration on the receiver. With that in place, Expr gains __reduce__ + classmethod from_bytes(buf, ctx=None) so pickle.dumps / pickle.loads work end-to-end on expressions built from col, lit, built-in functions, and Python scalar UDFs. Wire format is framed as <DFPYUDF magic, version byte, cloudpickle tuple>; the version byte lets a too-new/too-old payload surface a clean Execution error instead of an opaque cloudpickle unpack failure. Schema serde is via arrow-rs's native IPC (no pyarrow round-trip). Cloudpickle module handle is cached per-interpreter through PyOnceLock. Worker-side context resolution lives in a new datafusion.ipc module: set_worker_ctx / get_worker_ctx / clear_worker_ctx plus a private _resolve_ctx helper consulted by Expr.from_bytes. Priority is explicit ctx > worker ctx > global SessionContext. FFI UDFs still travel by name and require the matching registration on the receiver's context. Aggregate and window UDF inline encoding, the per-session with_python_udf_inlining toggle, sender-side context, and the user-guide docs land in follow-on PRs. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
…edge-case tests
Inline `.. warning::` blocks on `Expr.to_bytes`, `Expr.from_bytes`, and
`Expr.__reduce__` so the cloudpickle / arbitrary-code-execution caveat is
visible at the public API surface in advance of the user-guide page that
lands in PR 4.
Add doctest-style `Examples:` blocks to `datafusion.ipc` functions
(`set_worker_ctx`, `clear_worker_ctx`, `get_worker_ctx`, `_resolve_ctx`),
`ScalarUDF.name`, and the new `Expr` pickle methods, per CLAUDE.md.
Tighten `Expr.__reduce__` return annotation to
`tuple[Callable[[bytes], Expr], tuple[bytes]]`.
Tests: multi-arg UDF round-trip (covers synthetic `arg_{i}` schema-field
loop in the codec) plus malformed-bytes paths through `Expr.from_bytes`.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
ntjohnson1
left a comment
There was a problem hiding this comment.
The general shape of the changes here seem reasonable. I guess my only lingering thought/question is mostly around cloudpickle capabilities and the usage of it.
| }) | ||
| } | ||
|
|
||
| /// Build the cloudpickle payload for a `PythonFunctionScalarUDF`. |
There was a problem hiding this comment.
Maybe it is capture more clearly somewhere else but it feels like there is some nuance of the dependency on cloudpickle that's not fully communicated here. I didn't do too much of a deep dive on it.
-
cloudpickle only works on the same version of python (I'm not sure if it detects the mismatch with a nice error). So potentially your header might want to capture the source python version to give a nicer error and advertise that there is a limitation of only sending to the same version of python for remote workers
-
cloudpickle seems to have serialize by reference (more like dill) and by value (super cool). The former needs the function installed in the environment so when deserialized it can reference it where maybe the later tries to just capture all necessary bits (here is where I didn't deep dive a ton). Those are fairly different mental models for support.
| def test_udf_self_contained_blob(self): | ||
| e = _double_udf()(col("a")) | ||
| blob = pickle.dumps(e) | ||
| # The codec inlines the callable, so the blob is much bigger than a |
There was a problem hiding this comment.
I think this is testing the thing I was asking about but I haven't thought deeply enough if it actually does. If I know cloud pickle says it can serialize lambdas but if I instead had
from foo import double
def _double_udf():
return udf(
double,
[pa.int64()],
pa.int64(),
volatility="immutable",
name="double",
)Would I still be able to deserialize this on remote in a python environment without foo?
Which issue does this PR close?
Addresses part of #1517
This is PR 1 of 4. The four PRs stack sequentially on top of this one; subsequent PRs target this branch's tip until it merges.
Follow up PRs:
Rationale for this change
Today a
LogicalPlanorExprreferencing a Python-definedScalarUDFcannot survive a serialization round-trip without the receiver pre-registering a matching UDF, because the upstream protobuf codecs only carry the UDF name. That blocks shipping expressions to worker processes viapickle.dumps/multiprocessing.Pool/ Ray actors /datafusion-distributed. This PR closes the scalar-UDF case end-to-end so the naturalpickle.dumps(expr)pattern works for built-ins and Python scalar UDFs with no receiver-side setup.What changes are included in this PR?
Adds Python-aware encoding to
PythonLogicalCodecandPythonPhysicalCodec.On the Python side,
Exprgains__reduce__plus aclassmethod from_bytes(buf, ctx=None). A newdatafusion.ipcmodule exposesset_worker_ctx/get_worker_ctx/clear_worker_ctxthread-locals;_resolve_ctxconsults explicit-ctx > worker-ctx > globalSessionContext.cloudpickle>=2.0is added as a runtime dependency (lazy-imported on the encode/decode hot path). This is a tiny dependency, in the kilobyte range.Aggregate and window inline encoding, the per-session
with_python_udf_inliningtoggle, sender-side context wiring, and the user-guide docs land in PRs 2-4 of this series.Are there any user-facing changes?
Yes, but these are only additions.
Expris now picklable. Built-ins and Python scalar UDFs round-trip with no worker-side setup.Expr.to_bytes(ctx=None)/Expr.from_bytes(buf, ctx=None)shape.from_bytesis now aclassmethodwithctxas a keyword-onlyNone-default. Breaking for any directExpr.from_bytes(ctx, blob)callers — the in-tree call sites are updated.datafusion.ipcwithset_worker_ctx/get_worker_ctx/clear_worker_ctx.ScalarUDF.nameproperty.cloudpickle>=2.0.Expr.from_bytes` has a signature flip, but that is unreleased (only merged yesterday) and so not a change any user will experience.