【发布时间】:2020-04-21 22:05:46
【问题描述】:
我正在探索 Spark 在将表连接到自身时的行为。我正在使用 Databricks。
我的虚拟场景是:
将外部表读取为数据帧 A(基础文件为增量格式)
将数据帧 B 定义为数据帧 A,仅选择某些列
在 column1 和 column2 上加入数据帧 A 和 B
(是的,没有多大意义,我只是在尝试理解 Spark 的底层机制)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
我的第一次尝试是按原样运行代码(尝试 1)。然后我尝试重新分区和缓存(尝试 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
最后,我重新分区、排序和缓存
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
生成的各个dag如附件。
我的问题是:
为什么在尝试 1 中,即使未明确指定缓存,表似乎也已缓存。
为什么 InMemoreTableScan 后面总是跟另一个这种类型的节点。
为什么尝试 3 缓存似乎发生在两个阶段?
为什么在尝试 3 WholeStageCodegen 后跟一个(并且只有一个)InMemoreTableScan。
【问题讨论】:
-
我怀疑当源是外部表时,DataFrame 读取器会自动缓存数据。我有类似的情况,我正在从数据库表中读取数据,而正在下载“应用程序详细信息 UI”上的“SQL”选项卡显示正在下载的行数,但在指定位置尚未保存文件.我猜它知道计数,因为它在某处缓存了数据,这就是 DAG 上显示的内容。如果您从本地文本文件中读取数据,那么您将看不到缓存状态。
标签: apache-spark pyspark bigdata azure-databricks delta-lake