【问题标题】:Spark - how to write every n lines to a different output file?Spark - 如何将每 n 行写入不同的输出文件?
【发布时间】:2017-07-21 16:48:04
【问题描述】:

我是 Spark 的新手(虽然我有 Hadoop 和 MapReduce 经验),并且正在尝试处理一个每行包含一条 JSON 记录的巨型文件。我想对每一行进行一些转换,并每 n 条记录(比如 100 万条)写入一个输出文件。因此,如果输入文件中有 750 万条记录,则应生成 8 个输出文件。 我怎样才能做到这一点?您可以使用 Java 或 Scala 提供答案。

使用 Spark v2.1.0。

【问题讨论】:

  • 即使有可能,为什么?如果您有 HDFS,您将拥有将围绕 HDFS 块大小拆分的分区文件...话虽如此,您的数据可以(并且将)在一行中间拆分 /b>
  • 换句话说,有很多 Spark 线程和进程在读取你的文件。你不能只说,好吧 process1,你得到 100 万行,而 process2,你得到下一个......如果你这样做,你最好不要使用 Spark。
  • @cricket_007 这些文件最终将用于填充 Couchbase 中的数据,我们可能不会同时加载所有这些文件。所以我们想要一口大小的块。我选择了 Spark,以便我可以根据需要进行扩展,但如果它不能满足我的要求,我将不得不寻找另一个可以的工具。
  • 关于您的第二条评论,如果重要,我不在乎文件是如何读取的或有多少线程正在这样做。我希望输出按记录号分区。
  • 为什么不能用Couchbase的spark连接器直接喂?

标签: java scala hadoop apache-spark spark-streaming


【解决方案1】:

你可以使用类似的东西:

val dataCount = data.count
val numPartitions = math.ceil(dataCount.toDouble/100000).toInt
val newData = data.coalesce(numPartitions)
newData.saveAsTextFile("output path")

我现在在我的 Windows 游戏电脑上。所以这段代码未经测试,可能包含一些小错误。但总的来说应该可以。

参考:Spark: Cut down no. of output files

附带说明,虽然控制分区大小不是一个坏主意,但随意决定要在一个分区中包含 100 万条记录可能不是可行的方法。通常,您可以通过调整分区大小来优化集群利用率。

编辑:我应该注意,这并不能保证每个分区会有一百万条记录,只是你应该在那个球场有一些东西。

【讨论】:

  • 不起作用。我不认为coalesce 直接控制您链接到的线程所建议的输出文件的数量。实际上,您的答案可能应该是带有Spark: Cut down no. of output files 链接的评论。似乎还有其他人试图徒劳地使用coalesceSpark dataFrame.colaesce...does not seem to work for me
  • 我同意评论评论。不幸的是,rep 在这里工作的愚蠢方式,我可以回答问题而不是制作 cmets。至于你的主要观点,虽然我没有测试上面的代码。我过去曾成功使用过合并。但就您的观点而言,正如我在上面评论的那样,是的,它并不是真正用于管理文件编号。它更多地是关于优化您的集群使用,但应该有让您的文件数量/大小更接近您想要的东西的副作用。如果每个文件恰好有一百万条记录是您项目的硬要求,那么您可能不应该使用 spark。底层
  • rdds 的性质就是不能那样工作。你找到的任何让它发挥作用的解决方案几乎肯定是笨拙的,并且是使用 spark 的次优方式。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-07-13
  • 1970-01-01
  • 2020-07-25
  • 2019-04-04
  • 2016-05-26
相关资源
最近更新 更多