摘要:使用 Dask 内置方法创建一个名为 ddf 的 DataFrame。这是一个随机生成的时间序列数据,每个数据样本代表一秒钟,总共包含 4 天的数据(2024-01-01 0:00~2024-01-05 0:00)。
我们先使用 Dask DataFrame 演示如何通过 Dask 并行化 Pandas DataFrame。
1.1 创建 Dask DataFrame
使用 Dask 内置方法创建一个名为 ddf 的 DataFrame。这是一个随机生成的时间序列数据,每个数据样本代表一秒钟,总共包含 4 天的数据(2024-01-01 0:00~2024-01-05 0:00)。
Import daskddf = dask.datasets.timeseries(start="2024-01-01",end="2024-01-05")ddf运行结果如图所示。
Pandas 中的所有操作都是立即(Eager)执行的。而 Dask 是延迟(Lazy)执行的,此时数据尚未开始计算,因此都用省略号“„”表示。
虽然 ddf 的数据尚未被计算,但 Dask 已经获取了数据的列名和数据类型。可以通过 dtypes查看列的信息:
ddf.dtypes运行结果如图所示。
1.2 执行计算
如果想计算并得到结果,必须使用 compute手动触发计算。
ddf.compute运行结果如图所示。
Dask DataFrame 有一个重要的内置变量npartitions,它表示数据被切分成多少份,或者说一共有多少个分区(Partition)。
如图所示
Dask DataFrame 由多个 Pandas DataFrame 组成,每个 Pandas DataFrame 被称为一个分区。
ddf.npartitions运行结果如图所示。
每个分区都有上界和下界。在这个例子中,ddf 是根据时间列进行切分的,每天的数据构成一个分区。内置变量 divisions 存放每个分区的分界线:
ddf.divisions运行结果如图所示。
1.3 索引
本例中,ddf 的索引是时间。每个分区基于索引列进行切分。整个 ddf 包含 4 天的数据,每个分区包含一天的数据。
现在我们选择 2024-01-01 0:00~2024-01-02 5:00 的数据,这个时间范围横跨了两天,即两个分区。可以使用以下命令进行选择:
Ddf["2024-01-01 0:00": "2024-01-02 5:00"]运行结果如图所示。
仍然需要使用 compute来触发计算,得到结果:
ddf["2024-01-01 0:00": "2024-01-02 5:00"].compute运行结果如图所示。
1.4 Pandas 兼容
Dask DataFrame 的大部分操作与 Pandas 几乎一致,我们可以像使用 Pandas 那样使用 DaskDataFrame。例如,可以进行数据过滤和分组操作(groupby):
ddf2 = ddf[ddf.y > 0]ddf3 = ddf2.groupby("name").x.stdddf3运行结果如图所示。
现在的结果仍然用省略号(„)表示,因为计算是延迟执行的,需要调用 compute来触发实际计算:
Computed_ddf = ddf3.computecomputed_ddf运行结果如图所示。
1.5 计算图
至此,我们已经了解到 Dask DataFrame 会将大数据切分成分区,并且采用延迟执行的方式。在 Dask 构建了 Task Graph,分别对每个分区进行计算。
执行 compute之前,Dask 会构建一个任务计算图(Task Graph),并使用 visualize函数可视化任务计算图(Task Graph):
Ddf3.visualize(filename="../img/ch-dask/visualize-task-graph", format="svg")在计算图中,圆圈表示计算任务,长方形表示数据对象。对于 Dask DataFrame 来说,长方形代表的是 Pandas DataFrame,如图所示。
本篇文章就先到这这结束啦,后续还有其他干货文章大家点个关注不迷路哦!
来源:闲聊阁主