您可以使用 wholeTextFiles 来读取 rdd。这将读取每个文件,文件名作为键,文件的全部内容作为值。从那里,您应该能够使用 flatMapValues 将每条记录分成自己的 k/v 对。
val input = sc.wholeTextFiles(s3://...)
val inputFlat = input.flatMapValues(line => line.split("\n"))
对于这个例子,如果你的路径是 /user/hive/date=December/part-0000 并且 part-0000 的内容是
Joe December-28 Something
Ryan December-29 AnotherThing
输出如下所示:
input.take(1)
(/user/hive/date=December/part-0000, Joe December-28 Something\n Ryan December-29 AnotherThing)
inputFlat.take(2)
(/user/hive/date=December/part-0000, Joe December-28 Something)
(/user/hive/date=December/part-0000, Ryan December-29 AnotherThing)
我想您可以尝试以下方法。读取记录会有点慢,但是重新分区后可以最大化并行处理
inputFlat.flatMapValues(//some split).repartition(numWorkers)
我们可以尝试的另一件事是使用它:
在 hive 中,您可以使用名为 INPUT__FILE__NAME 的虚拟列检索找到记录的文件,例如:
select INPUT__FILE__NAME, id, name from users where ...;
我不确定它是否会起作用,但您可以尝试在您的 .sql api 中使用它。您必须确保您的 sqlContext 具有 hive-site.xml。