Examples of DataFrames with UDFs
Scenario
In AI data engineering, data preprocessing is a crucial step, typically involving complex cleansing, transformation, and feature engineering operations on data stored in databases. However, traditional data preprocessing logic is often implemented externally through Python scripts, leading to significant data transfer between the database and the Python environment. This not only increases computational overhead but also fails to fully utilize the distributed computing capabilities of the database. By implementing Python UDFs within the database kernel and enhancing ibis-fabric with both explicit and implicit Python UDF invocation interfaces, we can embed data preprocessing logic directly into database queries, thereby reducing data transfer and boosting overall processing efficiency.
For complex service logics, such as filtering feature data tables row-by-row using machine learning models, traditional functional UDF definitions fall short in supporting model initialization and resource management needs. Since model hyperparameters and resources (for example, model file loading) require one-time configuration prior to function execution, functional UDFs struggle to efficiently handle these initialization logics and resource deallocation operations. Defining Python UDFs in a class format allows users to configure model hyperparameters and initialize resources (like loading a model) in the __init__ method, execute inference logic in the process method, and release resources (such as closing files or connections) via the __del__ method, thus achieving effective resource management and service logic processing.
Constraints
- Constraints on Python UDFs include:
- Only user-defined functions written in Python are supported.
- Users are responsible for managing Python environment dependencies. Compatibility among library versions must be ensured.
- The Python version must be 3.11 in the runtime environment.
- Constraints on Python class UDFs include:
- Classes must define an __init__ method whose parameter names match those used during calls.
- A process member method must be defined within the class, serving as the primary entry point for the UDF.
- A __del__ member method can be defined within a class as needed, and this method does not support input parameters.
- When a class UDF is called, only constant values can be passed to the __init__ method—data columns or other expressions are unsupported.
Using Scalar UDF in combination with DataFrame is the recommended standard approach. In this case, the entire Scalar UDF must encompass the SELECT method of DataFrame.
After registration, the returned value is a UDF operator within DataFrame. This operator can then be invoked multiple times by various DataFrame expressions, as shown below:
import ibis import ibis_fabric as fabric from ibis_fabric.udf import RegisterType def transform_json(ts: float, msg: str) -> str: import json from car_bu.parser_core import ParseObjects, dict_to_object if msg == '0_msg': return json.dumps({"time_stamp": 0.0, "msg": {}}) else: d = dict_to_object(json.loads(msg)) return json.dumps({"time_stamp": ts/10, "msg": ParseObjects(d)}) con = ibis.fabric.connect(...) # Explicitly register transform_json. transform_json_udf = con.udf.python.register( transform_json, database="your-database", imports=['car_bu/parser_core.py'], packages=['json'], register_type=RegisterType.OBS ) # First use of transform_json combined with the SELECT method of DataFrame. t = con.table("your-table", database="your-database") expression = t.select(transform_json_udf(t.ts, t.msg).name("json column")) df = expression.execute() # Second use of transform_json combined with the SELECT method of DataFrame. t = con.table("your-table", database="your-database") filtered = t.filter(...) local_view = filtered.select(...).mutate(...) span_base = local_view.select(...).filter(...) span_part2 = ibis.memtable(...) union_part = span_base.union(span_part2) window = ibis.window(...) final_span = union_part.union(...).order_by(...).select(...) result = final_span.mutate(...).select(...).filter(...).order_by(...) result = result.select(transform_json_udf(result.ts, result.msg).name("json column")) df = result.execute()
Usage differs slightly between scalar class UDFs of Python classes and scalar UDFs of Python functions: the UDF operator accepts arguments for the process method whereas with_arguments takes parameters intended for the __init__. A complete example is as follows:
import abc import ibis import ibis_fabric as fabric from ibis_fabric.udf import RegisterType from sentencepiece import SentencePieceProcessor from typing import Sequence class Base(abc.ABC): def __init__(self, *args, **kwargs): pass @abc.abstractmethod def process(self, *args, **kwargs): ... def __del__(self): pass class SPManager(Base): def __init__(self, model_file: str, *, bos: bool = False, eos: bool = False, reverse: bool = False) -> None: sp = SentencePieceProcessor(model_file=model_file) opts = ":".join(opt for opt, flag in (("bos", bos), ("eos", eos), ("reverse", reverse)) if flag) sp.set_encode_extra_options(opts) self.spm = sp def process(self, input_text: str) -> Sequence[str]: return self.spm.encode(input_text, out_type=str) con = ibis.fabric.connect(...) # Explicitly register SPManager. sentencepiece_udf = con.udf.python.register( SPManager, database="your-database", imports=['test_model.model'], packages=['sentencepiece'], register_type=RegisterType.OBS ) # Use SPManager with the SELECT method of DataFrame. t = con.table("your-table", database="your-database") expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column")) df = expression.execute()
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot