带Scalar UDF的DF示例
场景描述
在AI数据工程中,数据预处理是一个关键步骤,通常需要对存储在数据库中的数据进行复杂的清洗、转换和特征工程操作。然而,传统的数据预处理逻辑往往在数据库外部通过Python脚本实现,这会导致大量数据在数据库和Python环境之间传输,不仅增加了计算开销,还无法充分利用数据库的分布式计算能力。通过在数据库内核中实现Python UDF,并在ibis-fabric中增加Python UDF显式,隐式调用接口,可以将数据预处理逻辑直接嵌入到数据库查询中,减少数据传输,提升整体处理效率。
对于复杂业务逻辑,例如使用机器学习模型对特征数据表进行按行过滤。传统函数式UDF定义方式无法有效支持模型初始化和资源管理需求,因为模型超参数和资源(如模型文件加载)需要在函数执行前一次性配置,而函数式UDF无法高效管理此类初始化逻辑和资源释放操作。通过以Class形式定义Python UDF,用户可以在__init__方法中配置模型超参数和初始化资源(如加载模型),并在process方法中执行推理逻辑,同时通过__del__方法释放资源(如关闭文件或连接),从而实现高效的资源管理和业务逻辑处理。
约束限制
- Python UDF功能约束限制如下:
- 仅支持Python语言编写的用户自定义函数。
- Python环境依赖管理由用户自行负责,需确保各依赖库版本兼容。
- 运行时环境要求Python版本为3.11。
- Python Class UDF功能约束限制如下:
- 类必须定义__init__方法,且其参数名称需与调用时的参数名称保持一致。
- 类必须定义process成员方法,该方法为UDF的主函数入口。
- 类可按需定义__del__成员方法,且该方法不支持传入参数。
- 调用class型UDF时,__init__方法的参数仅支持传入常量,不支持传入表的数据列或其他表达式。
结合DataFrame使用Scalar UDF是推荐的标准用法,此时整个Scalar UDF的外部必须要包围DataFrame的SELECT方法。
经过注册后返回的值是DataFrame中的一个UDF算子。此时,该算子可以被多个DataFrame表达式多次调用,示例如下:
import ibis import ibis_fabric as fabric from ibis_fabric.udf import RegisterType def transform_json(ts: float, msg: str) -> str: import json from car_bu.parser_core import ParseObjects, dict_to_object if msg == '0_msg': return json.dumps({"time_stamp": 0.0, "msg": {}}) else: d = dict_to_object(json.loads(msg)) return json.dumps({"time_stamp": ts/10, "msg": ParseObjects(d)}) con = ibis.fabric.connect(...) # 显式注册transform_json transform_json_udf = con.udf.python.register( transform_json, database="your-database", imports=['car_bu/parser_core.py'], packages=['json'], register_type=RegisterType.OBS ) # 结合DataFrame的SELECT方法,第一次使用transform_json t = con.table("your-table", database="your-database") expression = t.select(transform_json_udf(t.ts, t.msg).name("json column")) df = expression.execute() # 结合DataFrame的SELECT方法,第二次使用transform_json t = con.table("your-table", database="your-database") filtered = t.filter(...) local_view = filtered.select(...).mutate(...) span_base = local_view.select(...).filter(...) span_part2 = ibis.memtable(...) union_part = span_base.union(span_part2) window = ibis.window(...) final_span = union_part.union(...).order_by(...).select(...) result = final_span.mutate(...).select(...).filter(...).order_by(...) result = result.select(transform_json_udf(result.ts, result.msg).name("json column")) df = result.execute()
对于Python类的Scalar Class UDF,使用方式和Python函数的Scalar UDF稍有不同:UDF算子接受process方法参数;with_arguments接受__init__方法参数。完整示例如下:
import abc import ibis import ibis_fabric as fabric from ibis_fabric.udf import RegisterType from sentencepiece import SentencePieceProcessor from typing import Sequence class Base(abc.ABC): def __init__(self, *args, **kwargs): pass @abc.abstractmethod def process(self, *args, **kwargs): ... def __del__(self): pass class SPManager(Base): def __init__(self, model_file: str, *, bos: bool = False, eos: bool = False, reverse: bool = False) -> None: sp = SentencePieceProcessor(model_file=model_file) opts = ":".join(opt for opt, flag in (("bos", bos), ("eos", eos), ("reverse", reverse)) if flag) sp.set_encode_extra_options(opts) self.spm = sp def process(self, input_text: str) -> Sequence[str]: return self.spm.encode(input_text, out_type=str) con = ibis.fabric.connect(...) # 显式注册SPManager sentencepiece_udf = con.udf.python.register( SPManager, database="your-database", imports=['test_model.model'], packages=['sentencepiece'], register_type=RegisterType.OBS ) # 结合DataFrame的SELECT方法,使用SPManager t = con.table("your-table", database="your-database") expression = t.select(sentencepiece_udf(t.data).with_arguments(model_file="test_model.model", bos=True, eos=True).name("pieces column")) df = expression.execute()