Skip to content

feat: inline encoding for Python aggregate and window UDFs (2/4)#1545

Draft
timsaucer wants to merge 2 commits into
apache:mainfrom
timsaucer:pr2-agg-window-inline
Draft

feat: inline encoding for Python aggregate and window UDFs (2/4)#1545
timsaucer wants to merge 2 commits into
apache:mainfrom
timsaucer:pr2-agg-window-inline

Conversation

@timsaucer
Copy link
Copy Markdown
Member

Which issue does this PR close?

No associated issue. PR 2 of 4 stacked on #1544. The diff shown against `main` is cumulative until #1544 merges — review the commits on `pr2-agg-window-inline` directly, or wait for #1544 to merge for a clean diff.

Rationale for this change

PR 1 closed the round-trip for scalar UDFs. The same shipped-expression problem applies to Python aggregate and window UDFs: their accumulator / partition-evaluator factory is a Python callable, so a receiver that only has the UDF name cannot reconstruct one. This PR extends the inline-encoding mechanism so the natural `pickle.dumps(expr)` pattern also works for expressions referencing Python UDAFs and UDWFs.

What changes are included in this PR?

Codec extension is a straight parallel of the scalar path. New wire-format families:

Kind Family magic Cloudpickle tuple shape
Agg `DFPYUDA` `(name, accumulator_factory, input_schema_bytes, return_schema_bytes, state_schema_bytes, volatility_str)`
Window `DFPYUDW` `(name, evaluator_factory, input_schema_bytes, return_schema_bytes, volatility_str)`

The aggregate state schema is encoded as a full IPC schema (not a positional `Vec`), so the post-decode UDF reports the same names, nullability, and metadata as the sender. This matters for accumulators whose `StateFieldsArgs` consumers key off names rather than positions.

To let the codec downcast and grab the Python callable directly, two existing UDF impls are restructured:

  • `udaf.rs`: introduces a named `PythonFunctionAggregateUDF` that stores the `Py` accumulator factory. `PyAggregateUDF.new` now wires `AggregateUDF::new_from_impl(PythonFunctionAggregateUDF::new(...))` instead of the prior `create_udaf` + closure path. State field names default to synthesized `state_{i}` on the Python constructor path; `from_parts` (called by the decoder) restores the full schema from the IPC payload.
  • `udwf.rs`: renames `MultiColumnWindowUDF` → `PythonFunctionWindowUDF` and drops the `PartitionEvaluatorFactory` + `PtrEq` wrapping. Stores the `Py` evaluator directly. `PartialEq` / `Hash` pick up the same pointer-identity fast path and `eq` exception-logging behavior as `PythonFunctionScalarUDF` in PR 1.

User-facing surface:

  • `AggregateUDF.name` and `WindowUDF.name` properties (parallel to `ScalarUDF.name` from PR 1).
  • Existing UDAF/UDWF construction paths are unchanged at the user level — same constructors, same arguments, same semantics. The internal impl swap is invisible.

Are there any user-facing changes?

  • Python aggregate and window UDFs survive `pickle.dumps` / `pickle.loads` and `Expr.to_bytes` / `Expr.from_bytes` round-trips. The decoded UDF reproduces the original state schema and runs end-to-end (verified by `test_agg_udf_evaluates_after_roundtrip`, which aggregates over a 5-row frame after a pickle round-trip).
  • `AggregateUDF.name` and `WindowUDF.name` are new public properties.
  • `MultiColumnWindowUDF` is renamed to `PythonFunctionWindowUDF`. The struct was `pub` but only used within the crate; no in-tree caller breaks. Downstream Rust users importing it directly would need to update.

The `MultiColumnWindowUDF` rename is a Rust-side breaking change, so adding `api change` even though no Python-facing API breaks.

timsaucer and others added 2 commits May 15, 2026 14:17
Adds Python-aware encoding to PythonLogicalCodec/PythonPhysicalCodec
so a ScalarUDF defined in Python travels inside the serialized
expression (cloudpickled into fun_definition) instead of needing a
matching registration on the receiver. With that in place, Expr gains
__reduce__ + classmethod from_bytes(buf, ctx=None) so pickle.dumps /
pickle.loads work end-to-end on expressions built from col, lit,
built-in functions, and Python scalar UDFs.

Wire format is framed as <DFPYUDF magic, version byte, cloudpickle
tuple>; the version byte lets a too-new/too-old payload surface a
clean Execution error instead of an opaque cloudpickle unpack
failure. Schema serde is via arrow-rs's native IPC (no pyarrow
round-trip). Cloudpickle module handle is cached per-interpreter
through PyOnceLock.

Worker-side context resolution lives in a new datafusion.ipc module:
set_worker_ctx / get_worker_ctx / clear_worker_ctx plus a private
_resolve_ctx helper consulted by Expr.from_bytes. Priority is
explicit ctx > worker ctx > global SessionContext. FFI UDFs still
travel by name and require the matching registration on the
receiver's context.

Aggregate and window UDF inline encoding, the per-session
with_python_udf_inlining toggle, sender-side context, and the
user-guide docs land in follow-on PRs.

Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
Extends the PythonLogicalCodec / PythonPhysicalCodec inline encoding
introduced for scalar UDFs to also cover Python-defined aggregate and
window UDFs. The cloudpickle tuple shape per family is:

  DFPYUDA  (agg)     (name, accumulator_factory, input_schema_bytes,
                      return_schema_bytes, state_schema_bytes,
                      volatility_str)
  DFPYUDW  (window)  (name, evaluator_factory, input_schema_bytes,
                      return_schema_bytes, volatility_str)

Same wire-framing as scalar (family magic + version byte + cloudpickle
blob), same schema serde (arrow-rs native IPC), same cached cloudpickle
handle. The agg state schema is encoded as a full IPC schema so the
post-decode UDF reports the same names + nullability + metadata as the
sender — relevant for accumulators whose StateFieldsArgs consumers key
off names rather than positional DataType.

Required restructuring two existing UDF impls so the codec can grab
the Python callable directly:

* udaf.rs: replaces create_udaf + AccumulatorFactoryFunction closure
  with a named PythonFunctionAggregateUDF that stores the Py<PyAny>
  accumulator factory. Synthesizes state_{i} field names when the
  Python constructor passes only Vec<DataType>; from_parts preserves
  the full state schema on the decode side.
* udwf.rs: renames MultiColumnWindowUDF -> PythonFunctionWindowUDF,
  drops the PartitionEvaluatorFactory PtrEq wrapper, stores the
  Py<PyAny> evaluator directly. PartialEq and Hash get the same
  pointer-identity fast path + debug-log exception handling already
  on PythonFunctionScalarUDF.

User-facing surface:

* AggregateUDF.name and WindowUDF.name properties (parallel to the
  ScalarUDF.name shipped in PR1).
* Existing UDAF/UDWF construction paths are unchanged.

The per-session with_python_udf_inlining toggle, sender-side context,
strict refusal, and user-guide docs land in PRs 3-4 of this series.

Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant