【发布时间】:2015-09-30 14:02:36
【问题描述】:
我正在尝试将文件目录加载到 Spark RDD 中,并且我需要为每一行附加原始文件名。
我无法找到使用 sc.textFile 的常规 RDD 执行此操作的方法,因此我现在尝试使用 WholeTextFiles 方法来加载每个文件及其文件名。
我正在使用此代码:
val lines =
sc.wholeTextFiles(logDir).flatMap{ case (filename, content) =>
val hash = modFiles.md5(filename)
content.split("\n")
.filter(line =>
<filter conditions>
.map(line => line+hash)
}
但是这段代码给了我一个 Java 堆内存不足的错误,我猜它试图一次加载所有文件?
有没有办法不使用 wholeTextFiles 来解决这个问题和/或有没有办法不使用 wholeTextFiles 一次加载所有文件?
【问题讨论】:
-
val lines = sc.wholeTextFiles(logDir).lines.first()。如果没有 Java 堆内存不足错误,这可以正常工作吗? -
是的,我猜它指向了 flatMap。文件不是那么大 ~50MB - ~100MB。
-
一次有多少工作任务?尝试设置 numPartitions,以免一次全部运行。
-
@DaunnC 设置 numPartitions 不起作用。我尝试了 2、10、50、100、500。该作业在提交 67 个任务的随机播放请求中失败。
-
由于文件名很早就开始不同,我相信字符串比较不一定是减速的主要来源。但是,如果您担心,那么您可以单独获取文件名列表,计算哈希值,然后将其作为文件名映射 -> 哈希值进行广播,并以这种方式将文件名替换为哈希值。
标签: scala apache-spark