Updated on 2025-09-18 GMT+08:00

Examples of DataFrames with UDFs

Scenario

In AI data engineering, data preprocessing is a crucial step, typically involving complex cleansing, transformation, and feature engineering operations on data stored in databases. However, traditional data preprocessing logic is often implemented externally through Python scripts, leading to significant data transfer between the database and the Python environment. This not only increases computational overhead but also fails to fully utilize the distributed computing capabilities of the database. By implementing Python UDFs within the database kernel and enhancing ibis-fabric with both explicit and implicit Python UDF invocation interfaces, we can embed data preprocessing logic directly into database queries, thereby reducing data transfer and boosting overall processing efficiency.

For complex service logics, such as filtering feature data tables row-by-row using machine learning models, traditional functional UDF definitions fall short in supporting model initialization and resource management needs. Since model hyperparameters and resources (for example, model file loading) require one-time configuration prior to function execution, functional UDFs struggle to efficiently handle these initialization logics and resource deallocation operations. Defining Python UDFs in a class format allows users to configure model hyperparameters and initialize resources (like loading a model) in the __init__ method, execute inference logic in the process method, and release resources (such as closing files or connections) via the __del__ method, thus achieving effective resource management and service logic processing.

Constraints

  • Constraints on Python UDFs include:
    • Only user-defined functions written in Python are supported.
    • Users are responsible for managing Python environment dependencies. Compatibility among library versions must be ensured.
    • The Python version must be 3.11 in the runtime environment.
  • Constraints on Python class UDFs include:
    • Classes must define an __init__ method whose parameter names match those used during calls.
    • A process member method must be defined within the class, serving as the primary entry point for the UDF.
    • A __del__ member method can be defined within a class as needed, and this method does not support input parameters.
    • When a class UDF is called, only constant values can be passed to the __init__ method—data columns or other expressions are unsupported.

Using Scalar UDF in combination with DataFrame is the recommended standard approach. In this case, the entire Scalar UDF must encompass the SELECT method of DataFrame.

After registration, the returned value is a UDF operator within DataFrame. This operator can then be invoked multiple times by various DataFrame expressions, as shown below:

import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType

def transform_json(ts: float, msg: str) -> str:
    import json
    from car_bu.parser_core import ParseObjects, dict_to_object
    if msg == '0_msg':
        return json.dumps({"time_stamp": 0.0, "msg": {}})
    else:
        d = dict_to_object(json.loads(msg))
        return json.dumps({"time_stamp": ts/10, "msg": ParseObjects(d)})

con = ibis.fabric.connect(...)

# Explicitly register transform_json.
transform_json_udf = con.udf.python.register(
    transform_json,
    database="your-database",
    imports=['car_bu/parser_core.py'],
    packages=['json'],
    register_type=RegisterType.OBS
)

# First use of transform_json combined with the SELECT method of DataFrame.
t = con.table("your-table", database="your-database")
expression = t.select(transform_json_udf(t.ts, t.msg).name("json column"))
df = expression.execute()

# Second use of transform_json combined with the SELECT method of DataFrame.
t = con.table("your-table", database="your-database")
filtered = t.filter(...)
local_view = filtered.select(...).mutate(...)
span_base = local_view.select(...).filter(...)
span_part2 = ibis.memtable(...)
union_part = span_base.union(span_part2)
window = ibis.window(...)
final_span = union_part.union(...).order_by(...).select(...)
result = final_span.mutate(...).select(...).filter(...).order_by(...)
result = result.select(transform_json_udf(result.ts, result.msg).name("json column"))
df = result.execute()

Usage differs slightly between scalar class UDFs of Python classes and scalar UDFs of Python functions: the UDF operator accepts arguments for the process method whereas with_arguments takes parameters intended for the __init__. A complete example is as follows:

import abc
import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType
from sentencepiece import SentencePieceProcessor
from typing import Sequence

class Base(abc.ABC):
    def __init__(self, *args, **kwargs): pass

    @abc.abstractmethod
    def process(self, *args, **kwargs): ...

    def __del__(self): pass

class SPManager(Base):
    def __init__(self, model_file: str, *, bos: bool = False, eos: bool = False, reverse: bool = False) -> None:
        sp = SentencePieceProcessor(model_file=model_file)
        opts = ":".join(opt for opt, flag in (("bos", bos), ("eos", eos), ("reverse", reverse)) if flag)
        sp.set_encode_extra_options(opts)
        self.spm = sp

    def process(self, input_text: str) -> Sequence[str]:
        return self.spm.encode(input_text, out_type=str)

con = ibis.fabric.connect(...)

# Explicitly register SPManager.
sentencepiece_udf = con.udf.python.register(
    SPManager,
    database="your-database",
    imports=['test_model.model'],
    packages=['sentencepiece'],
    register_type=RegisterType.OBS
)

# Use SPManager with the SELECT method of DataFrame.
t = con.table("your-table", database="your-database")
expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column"))
df = expression.execute()