【问题标题】:Spark performance transposing a DataFrame转置 DataFrame 的 Spark 性能
【发布时间】:2020-03-18 20:19:15
【问题描述】:

目标:转置从大型、高延迟数据存储中的 30,000 个表中收集的一组行:

  • 使用 spark.sql() 从每个表中提取大量行到 DataFrame 中
  • 将每列转换为单独的 CSV 文档,其中包含列值数组
  • 将此文档写入文件服务器

我解决问题的方法如下,并且成功了一半。

  • 对于数据存储区中的每个表,将 spark.sql() 行放入 DataFrame 中
  • 使用以下内容收集并提取每一列:
def getColumn(df: DataFrame, columnName: String): Seq[Option[String]] = {
      df.select(columnName).collect().map(_ (0)).toSeq
   }
  • 将每一列的结果转换为CSV
  • 作为文档发布到文件服务器

Spark 运行时,我观察作业性能,collect 步骤自然是一个巨大的瓶颈;有时几十分钟。由于作业运行了几个小时,它总是将提取速度降低 10 到 50 倍。上面的整个序列是在不同的线程中完成的。线程池大小是主机上处理器数量的两倍。

我的问题是: 我正在寻找关于如何改进这项工作的设计的建议,或者关于如何提高列提取效率​​的想法。

我想知道我的问题是否最适合 Spark。将每个数据帧的内容放在单个主机上以提取列不是更好吗?仅将行并行化以收集列似乎并没有增加价值。

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    一些想法:

    1. 不要单独获取列!
    2. 在驱动程序代码中使用多线程启动并行作业,例如使用 Scala 的并行集合循环遍历您的表
    3. 为什么要收集数据?使用 spark 内置支持以分布式方式将 CSV 写入 HDFS,然后在完成后将其移动到文件服务器

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-10-24
      • 2019-10-22
      • 2019-07-18
      • 2022-01-22
      • 1970-01-01
      • 2023-03-26
      • 2019-09-18
      • 2018-08-07
      相关资源
      最近更新 更多