更新时间:2025-09-18 GMT+08:00

不带UDF的DF示例

下文以tpch的query1为例,展示DataFrame的用法。

查询SQL为:

SELECT
    l_returnflag,
    l_linestatus,
    sum(l_quantity) AS sum_qty,
    sum(l_extendedprice) AS sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
    avg(l_quantity) AS avg_qty,
    avg(l_extendedprice) AS avg_price,
    avg(l_discount) AS avg_disc,
    count(*) AS count_order
FROM
    lineitem
WHERE
    l_shipdate <= CAST('1998-09-02' AS date)
GROUP BY
    l_returnflag,
    l_linestatus
ORDER BY
    l_returnflag,    l_linestatus;

对应的DataFrame逻辑如下:

import ibis  # 导入ibis依赖

con = ibis.fabric.connect(  # 调用Fabric后端连接,创建连接
    endpoint=FABRIC_ENDPOINT,  # 指定服务的区域,区域查询请参见地区和终端节点
    endpoint_id=FABRIC_ENDPOINT_ID,  # 查询endpoint_id
    access_key=ACCESS_KEY,  # 获取AK/SK
    secret_key=SECRET_KEY,
    project_id=FABRIC_PROJECT_ID,  # 如何获取project_id
    catalog_name=IBIS_TEST_FABRIC_CATELOG,  # 连接指定的Catalog
    workspace_id=FABRIC_WORKSPACE_ID,  # 获取workspace_id
    lakeformation_instance_id=IBIS_TEST_FABRIC_LAKEFORMATION_INSTANCE_ID,  # LakeFormation服务的实例ID,详情请参见与LakeFormation数据类型映射关系
    obs_directory_base=OBS_DIRECTORY_BASE,  # obs中udf的存储路径
    obs_bucket_name=OBS_BUCKET_NAME,  # obs的桶名字
    obs_server=OBS_SERVER,  # obs访问地址,详情请参见终端节点(Endpoint)和访问域名
)
t = con.table("lineitem", database="tpch")  # 通过连接到后端获取table表信息,建立表对象
q = t.filter(t.l_shipdate <= add_date("1998-12-01", dd=-90))
discount_price = t.l_extendedprice * (1 - t.l_discount)
charge = discount_price * (1 + t.l_tax)
q = q.group_by(["l_returnflag", "l_linestatus"])
q = q.aggregate(
    sum_qty=t.l_quantity.sum(),
    sum_base_price=t.l_extendedprice.sum(),
    sum_disc_price=discount_price.sum(),
    sum_charge=charge.sum(),
    avg_qty=t.l_quantity.mean(),
    avg_price=t.l_extendedprice.mean(),
    avg_disc=t.l_discount.mean(),
    count_order=lambda t: t.count(),
)
q = q.order_by(["l_returnflag", "l_linestatus"])
sql = q.compile()  # 将DataFrame编译为sql字符串
df = q.execute()  # 执行表达式并且返回结果集