Polars延迟API

Polars延迟API #

Polars 支持两种操作模式:立即执行模式(eager)和延迟执行模式(lazy)。到目前为止的示例都使用了立即执行 API,在这种模式下,查询会被立即执行。而在延迟执行 API 中,查询只有在调用 collect 方法时才会被求值。将执行操作推迟到最后一刻可以带来显著的性能优势,这也是为什么在大多数情况下更倾向于使用延迟执行 API。下面我们通过一个示例来演示这一点:

df = pl.read_csv("docs/assets/data/iris.csv")
df_small = df.filter(pl.col("sepal_length") > 5)
df_agg = df_small.group_by("species").agg(pl.col("sepal_width").mean())
print(df_agg)

在这个示例中,我们使用立即执行 API 来:

  1. 读取鸢尾花 数据集
  2. 根据花萼长度对数据集进行筛选。
  3. 计算每个品种的花萼宽度的平均值。

每一步都会立即执行并返回中间结果。这可能非常浪费资源,因为我们可能会做一些无用功,或者加载一些未被使用的额外数据。相反,如果我们使用延迟执行 API,并在定义好所有步骤之后再等待执行,那么查询规划器就可以执行各种优化操作。在这种情况下:

  1. 谓词下推:在读取数据集时尽早应用筛选条件,这样就只读取花萼长度大于 5 的行。
  2. 投影下推:在读取数据集时只选择所需的列,从而无需加载额外的列(例如,花瓣长度和花瓣宽度)。
q = (
    pl.scan_csv("docs/assets/data/iris.csv")
    .filter(pl.col("sepal_length") > 5)
    .group_by("species")
    .agg(pl.col("sepal_width").mean())
)

df = q.collect()

这些优化将显著降低内存和中央处理器(CPU)的负载,从而使你能够在内存中处理更大的数据集,并且处理速度更快。一旦定义好查询,你就调用 collect 方法来告知 Polars 你想要执行该查询。你可以在专门介绍 延迟执行 API 的章节中.了解到更多相关信息。

说明:在许多情况下,立即执行 API 实际上在底层调用了延迟执行 API,并且会立即收集结果。这样做的好处是,在查询本身的执行过程中,查询规划器仍然可以进行优化操作。

何时使用哪种模式 #

一般来说,应该优先使用延迟执行 API,除非你对中间结果感兴趣,或者正在进行探索性工作,还不确定你的查询最终会是什么样子。

预览查询计划 #

在使用延迟执行 API 时,你可以使用 explain 函数让 Polars 创建一份查询计划的描述,当你收集结果时,这份查询计划就会被执行。如果你想了解 Polars 对你的查询执行了哪些类型的优化,这个功能会很有用。我们可以让 Polars 解释一下我们上面定义的查询 q

print(q.explain())
AGGREGATE
    [col("sepal_width").mean()] BY [col("species")] FROM
  simple π 3/3 ["sepal_width", "species", ... 1 other column]
    Csv SCAN [docs/assets/data/iris.csv]
    PROJECT 3/5 COLUMNS
    SELECTION: [(col("sepal_length")) > (5.0)]

我们可以从解释中立刻看出,Polars 确实应用了谓词下推,因为它只读取花萼长度大于 5 的行;并且它也应用了投影下推,因为它只读取查询所需的列。

explain 函数还可以用于查看在给定模式的上下文中表达式展开会是什么样的。参考 表达式展开部分中的示例表达式:

(pl.col(pl.Float64) * 1.1).name.suffix("*1.1")

我们可以使用 explain 函数来查看这个表达式针对任意模式会如何进行求值。

schema = pl.Schema(
    {
        "int_1": pl.Int16,
        "int_2": pl.Int32,
        "float_1": pl.Float64,
        "float_2": pl.Float64,
        "float_3": pl.Float64,
    }
)

print(
    pl.LazyFrame(schema=schema)
    .select((pl.col(pl.Float64) * 1.1).name.suffix("*1.1"))
    .explain()
)
 SELECT [[(col("float_1")) * (1.1)].alias("float_1*1.1"), [(col("float_2")) * (1.1)].alias("float_2*1.1"), [(col("float_3")) * (1.1)].alias("float_3*1.1")] FROM
  DF ["int_1", "int_2", "float_1", "float_2", ...]; PROJECT["float_1", "float_2", "float_3"] 3/5 COLUMNS
logo