【发布时间】:2020-03-18 20:19:15
【问题描述】:
目标:转置从大型、高延迟数据存储中的 30,000 个表中收集的一组行:
- 使用 spark.sql() 从每个表中提取大量行到 DataFrame 中
- 将每列转换为单独的 CSV 文档,其中包含列值数组
- 将此文档写入文件服务器
我解决问题的方法如下,并且成功了一半。
- 对于数据存储区中的每个表,将 spark.sql() 行放入 DataFrame 中
- 使用以下内容收集并提取每一列:
def getColumn(df: DataFrame, columnName: String): Seq[Option[String]] = {
df.select(columnName).collect().map(_ (0)).toSeq
}
- 将每一列的结果转换为CSV
- 作为文档发布到文件服务器
Spark 运行时,我观察作业性能,collect 步骤自然是一个巨大的瓶颈;有时几十分钟。由于作业运行了几个小时,它总是将提取速度降低 10 到 50 倍。上面的整个序列是在不同的线程中完成的。线程池大小是主机上处理器数量的两倍。
我的问题是: 我正在寻找关于如何改进这项工作的设计的建议,或者关于如何提高列提取效率的想法。
我想知道我的问题是否最适合 Spark。将每个数据帧的内容放在单个主机上以提取列不是更好吗?仅将行并行化以收集列似乎并没有增加价值。
【问题讨论】:
标签: apache-spark apache-spark-sql