干货分享|Dask DataFrame 快速入门

摘要:使用 Dask 内置方法创建一个名为 ddf 的 DataFrame。这是一个随机生成的时间序列数据,每个数据样本代表一秒钟,总共包含 4 天的数据(2024-01-01 0:00~2024-01-05 0:00)。

我们先使用 Dask DataFrame 演示如何通过 Dask 并行化 Pandas DataFrame。

1.1 创建 Dask DataFrame

使用 Dask 内置方法创建一个名为 ddf 的 DataFrame。这是一个随机生成的时间序列数据,每个数据样本代表一秒钟,总共包含 4 天的数据(2024-01-01 0:00~2024-01-05 0:00)。

Import daskddf = dask.datasets.timeseries(start="2024-01-01",end="2024-01-05")ddf

运行结果如图所示。

Pandas 中的所有操作都是立即(Eager)执行的。而 Dask 是延迟(Lazy)执行的,此时数据尚未开始计算,因此都用省略号“„”表示。

虽然 ddf 的数据尚未被计算,但 Dask 已经获取了数据的列名和数据类型。可以通过 dtypes查看列的信息:

ddf.dtypes

运行结果如图所示。

1.2 执行计算

如果想计算并得到结果,必须使用 compute手动触发计算。

ddf.compute

运行结果如图所示。

Dask DataFrame 有一个重要的内置变量npartitions,它表示数据被切分成多少份,或者说一共有多少个分区(Partition)。

如图所示

Dask DataFrame 由多个 Pandas DataFrame 组成,每个 Pandas DataFrame 被称为一个分区。

ddf.npartitions

运行结果如图所示。

每个分区都有上界和下界。在这个例子中,ddf 是根据时间列进行切分的,每天的数据构成一个分区。内置变量 divisions 存放每个分区的分界线:

ddf.divisions

运行结果如图所示。

1.3 索引

本例中,ddf 的索引是时间。每个分区基于索引列进行切分。整个 ddf 包含 4 天的数据,每个分区包含一天的数据。

现在我们选择 2024-01-01 0:00~2024-01-02 5:00 的数据,这个时间范围横跨了两天,即两个分区。可以使用以下命令进行选择:

Ddf["2024-01-01 0:00": "2024-01-02 5:00"]

运行结果如图所示。

仍然需要使用 compute来触发计算,得到结果:

ddf["2024-01-01 0:00": "2024-01-02 5:00"].compute

运行结果如图所示。

1.4 Pandas 兼容

Dask DataFrame 的大部分操作与 Pandas 几乎一致,我们可以像使用 Pandas 那样使用 DaskDataFrame。例如,可以进行数据过滤和分组操作(groupby):

ddf2 = ddf[ddf.y > 0]ddf3 = ddf2.groupby("name").x.stdddf3

运行结果如图所示。

现在的结果仍然用省略号(„)表示,因为计算是延迟执行的,需要调用 compute来触发实际计算:

Computed_ddf = ddf3.computecomputed_ddf

运行结果如图所示。

1.5 计算图

至此,我们已经了解到 Dask DataFrame 会将大数据切分成分区,并且采用延迟执行的方式。在 Dask 构建了 Task Graph,分别对每个分区进行计算。

执行 compute之前,Dask 会构建一个任务计算图(Task Graph),并使用 visualize函数可视化任务计算图(Task Graph):

Ddf3.visualize(filename="../img/ch-dask/visualize-task-graph", format="svg")

在计算图中,圆圈表示计算任务,长方形表示数据对象。对于 Dask DataFrame 来说,长方形代表的是 Pandas DataFrame,如图所示。

本篇文章就先到这这结束啦,后续还有其他干货文章大家点个关注不迷路哦!

来源:闲聊阁主

相关推荐