【发布时间】:2018-03-26 13:12:18
【问题描述】:
我的数据按列分区(例如,id),并且我将此数据集保存在某个位置。时不时地,我会得到一个具有相同结构的较小的增量数据集,我基本上必须根据我的id 用date 列来决定哪条记录是最新的,来更新我现有的数据。 (我不会把它写在同一个地方,我把整个新的 blob 保存在别的地方。)
有两种方法我一直这样做 - 要么在一个窗口中分组,要么采用最高 date 的行。或者通过dropDuplicates,依赖于我的数据是有序的事实。 (我宁愿使用前者,但我一直在尝试各种东西。)
一个大问题是每个id 组都不可忽略(几千兆字节),所以我希望 Spark(与n 工作人员)能够理解,因为我正在阅读id-partitioned 数据和写入id-partitioned 数据,它会立即处理n ids 并不断将它们写入我的存储,并在完成之前的ids 后获取新的ids。
不幸的是,似乎正在发生的事情是,在将任何内容写入磁盘之前,Spark 在一项大工作中处理了我所有的id 组(并且自然会溢出到磁盘)。它变得非常非常慢。
问题是这样的:有没有办法强制 Spark 处理这些组并在它们准备好后立即写入它们?同样,它们是分区的,所以没有其他任务会影响我的分区。
这里有一段代码可以重现问题:
# generate dummy data first
import random
from typing import List
from datetime import datetime, timedelta
from pyspark.sql.functions import desc, col, row_number
from pyspark.sql.window import Window
from pyspark.sql.dataframe import DataFrame
def gen_data(n: int) -> List[tuple]:
names = 'foo, bar, baz, bak'.split(', ')
return [(random.randint(1, 25), random.choice(names), datetime.today() - timedelta(days=random.randint(1, 100))) \
for j in range(n)]
def get_df(n: int) -> DataFrame:
return spark.createDataFrame(gen_data(n), ['id', 'name', 'date'])
n = 10_000
df = get_df(n)
dd = get_df(n*10)
df.write.mode('overwrite').partitionBy('id').parquet('outputs/first')
dd.write.mode('overwrite').partitionBy('id').parquet('outputs/second')
d1 和 d2 都被 id 分区,结果数据集也是如此,但它没有反映在计划中:
w = Window().partitionBy('id').orderBy(desc('date'))
d1 = spark.read.parquet('outputs/first')
d2 = spark.read.parquet('outputs/second')
d1.union(d2).\
withColumn('rn', row_number().over(w)).filter(col('rn') == 1).drop('rn').\
write.mode('overwrite').partitionBy('id').parquet('outputs/window')
我也尝试过显式声明分区键(否则代码相同):
d1 = spark.read.parquet('outputs/first').repartition('id')
d2 = spark.read.parquet('outputs/second').repartition('id')
d1.union(d2).\
withColumn('rn', row_number().over(w)).filter(col('rn') == 1).drop('rn').\
write.mode('overwrite').partitionBy('id').parquet('outputs/window2')
这里使用dropDuplicates也是一样的:
d1 = spark.read.parquet('outputs/first')
d2 = spark.read.parquet('outputs/second')
d1.union(d2).\
dropDuplicates(subset=['id']).\
write.mode('overwrite').partitionBy('id').parquet('outputs/window3')
我还尝试强调我的工会仍然使用这样的方式进行分区,但同样无济于事:
df.union(d2).repartition('id').\
.withColumn...
我可以列出所有分区 (ids),在利用分区修剪、重复数据删除和写入的同时将它们一一加载。但这似乎是不必要的额外样板。或者可以通过foreach做到这一点吗?
更新(2018-03-27):
事实证明,关于分区的信息确实以一种或另一种方式存在于窗口功能中,因为当我在最后进行过滤时,确实会对输入进行分区修剪:
d1 = spark.read.parquet('outputs/first')
d2 = spark.read.parquet('outputs/second')
w = Window().partitionBy('id', 'name').orderBy(desc('date'))
d1.union(d2).withColumn('rn', row_number().over(w)).filter(col('rn') == 1).filter(col('id') == 12).explain(True)
结果
== Physical Plan ==
*(4) Filter (isnotnull(rn#387) && (rn#387 = 1))
+- Window [row_number() windowspecdefinition(id#187, name#185, date#186 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#387], [id#187, name#185], [date#186 DESC NULLS LAST]
+- *(3) Sort [id#187 ASC NULLS FIRST, name#185 ASC NULLS FIRST, date#186 DESC NULLS LAST], false, 0
+- Exchange hashpartitioning(id#187, name#185, 200)
+- Union
:- *(1) FileScan parquet [name#185,date#186,id#187] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/.../spark_perf_partitions/outputs..., PartitionCount: 1, PartitionFilters: [isnotnull(id#187), (id#187 = 12)], PushedFilters: [], ReadSchema: struct<name:string,date:timestamp>
+- *(2) FileScan parquet [name#191,date#192,id#193] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/.../spark_perf_partitions/outputs..., PartitionCount: 1, PartitionFilters: [isnotnull(id#193), (id#193 = 12)], PushedFilters: [], ReadSchema: struct<name:string,date:timestamp>
所以它确实只读取两个分区,每个文件一个。所以我可以,而不是循环,一次只运行一个过滤器的代码(过滤器在窗口函数和.write之间)。乏味且不太实用,但可能比将所有内容溢出到磁盘更快。
【问题讨论】:
标签: apache-spark spark-dataframe distributed-computing partitioning