更新时间:2025-09-18 GMT+08:00

带Scalar UDF的DF示例

场景描述

在AI数据工程中,数据预处理是一个关键步骤,通常需要对存储在数据库中的数据进行复杂的清洗、转换和特征工程操作。然而,传统的数据预处理逻辑往往在数据库外部通过Python脚本实现,这会导致大量数据在数据库和Python环境之间传输,不仅增加了计算开销,还无法充分利用数据库的分布式计算能力。通过在数据库内核中实现Python UDF,并在ibis-fabric中增加Python UDF显式,隐式调用接口,可以将数据预处理逻辑直接嵌入到数据库查询中,减少数据传输,提升整体处理效率。

对于复杂业务逻辑,例如使用机器学习模型对特征数据表进行按行过滤。传统函数式UDF定义方式无法有效支持模型初始化和资源管理需求,因为模型超参数和资源(如模型文件加载)需要在函数执行前一次性配置,而函数式UDF无法高效管理此类初始化逻辑和资源释放操作。通过以Class形式定义Python UDF,用户可以在__init__方法中配置模型超参数和初始化资源(如加载模型),并在process方法中执行推理逻辑,同时通过__del__方法释放资源(如关闭文件或连接),从而实现高效的资源管理和业务逻辑处理。

约束限制

  • Python UDF功能约束限制如下:
    • 仅支持Python语言编写的用户自定义函数。
    • Python环境依赖管理由用户自行负责,需确保各依赖库版本兼容。
    • 运行时环境要求Python版本为3.11。
  • Python Class UDF功能约束限制如下:
    • 类必须定义__init__方法,且其参数名称需与调用时的参数名称保持一致。
    • 类必须定义process成员方法,该方法为UDF的主函数入口。
    • 类可按需定义__del__成员方法,且该方法不支持传入参数。
    • 调用class型UDF时,__init__方法的参数仅支持传入常量,不支持传入表的数据列或其他表达式。

结合DataFrame使用Scalar UDF是推荐的标准用法,此时整个Scalar UDF的外部必须要包围DataFrame的SELECT方法。

经过注册后返回的值是DataFrame中的一个UDF算子。此时,该算子可以被多个DataFrame表达式多次调用,示例如下:

import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType

def transform_json(ts: float, msg: str) -> str:
    import json
    from car_bu.parser_core import ParseObjects, dict_to_object
    if msg == '0_msg':
        return json.dumps({"time_stamp": 0.0, "msg": {}})
    else:
        d = dict_to_object(json.loads(msg))
        return json.dumps({"time_stamp": ts/10, "msg": ParseObjects(d)})

con = ibis.fabric.connect(...)

# 显式注册transform_json
transform_json_udf = con.udf.python.register(
    transform_json,
    database="your-database",
    imports=['car_bu/parser_core.py'],
    packages=['json'],
    register_type=RegisterType.OBS
)

# 结合DataFrame的SELECT方法,第一次使用transform_json
t = con.table("your-table", database="your-database")
expression = t.select(transform_json_udf(t.ts, t.msg).name("json column"))
df = expression.execute()

# 结合DataFrame的SELECT方法,第二次使用transform_json
t = con.table("your-table", database="your-database")
filtered = t.filter(...)
local_view = filtered.select(...).mutate(...)
span_base = local_view.select(...).filter(...)
span_part2 = ibis.memtable(...)
union_part = span_base.union(span_part2)
window = ibis.window(...)
final_span = union_part.union(...).order_by(...).select(...)
result = final_span.mutate(...).select(...).filter(...).order_by(...)
result = result.select(transform_json_udf(result.ts, result.msg).name("json column"))
df = result.execute()

对于Python类的Scalar Class UDF,使用方式和Python函数的Scalar UDF稍有不同:UDF算子接受process方法参数;with_arguments接受__init__方法参数。完整示例如下:

import abc
import ibis
import ibis_fabric as fabric
from ibis_fabric.udf import RegisterType
from sentencepiece import SentencePieceProcessor
from typing import Sequence

class Base(abc.ABC):
    def __init__(self, *args, **kwargs): pass

    @abc.abstractmethod
    def process(self, *args, **kwargs): ...

    def __del__(self): pass

class SPManager(Base):
    def __init__(self, model_file: str, *, bos: bool = False, eos: bool = False, reverse: bool = False) -> None:
        sp = SentencePieceProcessor(model_file=model_file)
        opts = ":".join(opt for opt, flag in (("bos", bos), ("eos", eos), ("reverse", reverse)) if flag)
        sp.set_encode_extra_options(opts)
        self.spm = sp

    def process(self, input_text: str) -> Sequence[str]:
        return self.spm.encode(input_text, out_type=str)

con = ibis.fabric.connect(...)

# 显式注册SPManager
sentencepiece_udf = con.udf.python.register(
    SPManager,
    database="your-database",
    imports=['test_model.model'],
    packages=['sentencepiece'],
    register_type=RegisterType.OBS
)

# 结合DataFrame的SELECT方法,使用SPManager
t = con.table("your-table", database="your-database")
expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column"))
df = expression.execute()