【问题标题】:Spark: does DataFrameWriter have to be a blocking step?Spark:DataFrameWriter 必须是一个阻塞步骤吗?
【发布时间】:2018-03-26 13:12:18
【问题描述】:

我的数据按列分区(例如,id),并且我将此数据集保存在某个位置。时不时地,我会得到一个具有相同结构的较小的增量数据集,我基本上必须根据我的iddate 列来决定哪条记录是最新的,来更新我现有的数据。 (我不会把它写在同一个地方,我把整个新的 blob 保存在别的地方。)

有两种方法我一直这样做 - 要么在一个窗口中分组,要么采用最高 date 的行。或者通过dropDuplicates,依赖于我的数据是有序的事实。 (我宁愿使用前者,但我一直在尝试各种东西。)

一个大问题是每个id 组都不可忽略(几千兆字节),所以我希望 Spark(与n 工作人员)能够理解,因为我正在阅读id-partitioned 数据和写入id-partitioned 数据,它会立即处理n ids 并不断将它们写入我的存储,并在完成之前的ids 后获取新的ids。

不幸的是,似乎正在发生的事情是,在将任何内容写入磁盘之前,Spark 在一项大工作中处理了我所有的id 组(并且自然会溢出到磁盘)。它变得非常非常慢。

问题是这样的:有没有办法强制 Spark 处理这些组并在它们准备好后立即写入它们?同样,它们是分区的,所以没有其他任务会影响我的分区。


这里有一段代码可以重现问题:

# generate dummy data first
import random
from typing import List
from datetime import datetime, timedelta

from pyspark.sql.functions import desc, col, row_number
from pyspark.sql.window import Window
from pyspark.sql.dataframe import DataFrame

def gen_data(n: int) -> List[tuple]:
    names = 'foo, bar, baz, bak'.split(', ')
    return [(random.randint(1, 25), random.choice(names), datetime.today() - timedelta(days=random.randint(1, 100))) \
          for j in range(n)]

def get_df(n: int) -> DataFrame:
    return spark.createDataFrame(gen_data(n), ['id', 'name', 'date'])

n = 10_000
df = get_df(n)
dd = get_df(n*10)

df.write.mode('overwrite').partitionBy('id').parquet('outputs/first')
dd.write.mode('overwrite').partitionBy('id').parquet('outputs/second')

d1d2 都被 id 分区,结果数据集也是如此,但它没有反映在计划中:

w = Window().partitionBy('id').orderBy(desc('date'))

d1 = spark.read.parquet('outputs/first')
d2 = spark.read.parquet('outputs/second')

d1.union(d2).\
  withColumn('rn', row_number().over(w)).filter(col('rn') == 1).drop('rn').\
  write.mode('overwrite').partitionBy('id').parquet('outputs/window')

我也尝试过显式声明分区键(否则代码相同):

d1 = spark.read.parquet('outputs/first').repartition('id')
d2 = spark.read.parquet('outputs/second').repartition('id')

d1.union(d2).\
  withColumn('rn', row_number().over(w)).filter(col('rn') == 1).drop('rn').\
  write.mode('overwrite').partitionBy('id').parquet('outputs/window2')

这里使用dropDuplicates也是一样的:

d1 = spark.read.parquet('outputs/first')
d2 = spark.read.parquet('outputs/second')

d1.union(d2).\
  dropDuplicates(subset=['id']).\
  write.mode('overwrite').partitionBy('id').parquet('outputs/window3')

我还尝试强调我的工会仍然使用这样的方式进行分区,但同样无济于事:

df.union(d2).repartition('id').\
  .withColumn...

我可以列出所有分区 (ids),在利用分区修剪、重复数据删除和写入的同时将它们一一加载。但这似乎是不必要的额外样板。或者可以通过foreach做到这一点吗?


更新(2018-03-27):

事实证明,关于分区的信息确实以一种或另一种方式存在于窗口功能中,因为当我在最后进行过滤时,确实会对输入进行分区修剪:

d1 = spark.read.parquet('outputs/first')
d2 = spark.read.parquet('outputs/second')

w = Window().partitionBy('id', 'name').orderBy(desc('date'))

d1.union(d2).withColumn('rn', row_number().over(w)).filter(col('rn') == 1).filter(col('id') == 12).explain(True)

结果

== Physical Plan ==
*(4) Filter (isnotnull(rn#387) && (rn#387 = 1))
+- Window [row_number() windowspecdefinition(id#187, name#185, date#186 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#387], [id#187, name#185], [date#186 DESC NULLS LAST]
   +- *(3) Sort [id#187 ASC NULLS FIRST, name#185 ASC NULLS FIRST, date#186 DESC NULLS LAST], false, 0
      +- Exchange hashpartitioning(id#187, name#185, 200)
         +- Union
            :- *(1) FileScan parquet [name#185,date#186,id#187] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/.../spark_perf_partitions/outputs..., PartitionCount: 1, PartitionFilters: [isnotnull(id#187), (id#187 = 12)], PushedFilters: [], ReadSchema: struct<name:string,date:timestamp>
            +- *(2) FileScan parquet [name#191,date#192,id#193] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/.../spark_perf_partitions/outputs..., PartitionCount: 1, PartitionFilters: [isnotnull(id#193), (id#193 = 12)], PushedFilters: [], ReadSchema: struct<name:string,date:timestamp>

所以它确实只读取两个分区,每个文件一个。所以我可以,而不是循环,一次只运行一个过滤器的代码(过滤器在窗口函数和.write之间)。乏味且不太实用,但可能比将所有内容溢出到磁盘更快。

【问题讨论】:

    标签: apache-spark spark-dataframe distributed-computing partitioning


    【解决方案1】:

    是的,这正是 spark 分区的工作原理。因此,它计算整个沿袭,然后以分区形式写入磁盘。这有几个优点。一个重要的原因是并行写入。因此,当计算完成时,spark 可以将所有分区并行写入磁盘。这显着提高了性能。

    如果您想在数据准备好时写入,您不妨按不同的 ID 过滤数据帧,并在循环中计算进程并写入。但是,根据我的经验,这种方法需要在同一个数据帧上进行多次迭代,从而导致巨大的性能损失。

    【讨论】:

    • 当我的所有数据(100 GB)都在磁盘上(必须首先写入磁盘)时,并行写入器有什么用?我的问题的重点是将数据保存在内存中以加快写入速度并避免溢出到磁盘。
    • 如果你想连续写那么spark需要评估dan。几次。它将根据分区ID过滤输入数据集,然后进行操作,最后保存。这个过程比较耗时。如果您不想将数据溢出到磁盘,则可能需要增加 shuffle 内存分数
    • 问题是数据比我的内存大得多,所以这不是一个真正的选择。不过还是谢谢!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-05-07
    • 1970-01-01
    • 2014-06-16
    • 2023-01-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多