【发布时间】:2015-06-20 06:37:56
【问题描述】:
我正在尝试在 Spark/Scala 中编写一个函数,该函数需要 2 个 RDD,第一个中的每个项目,从第二个中查找适合第一个日期范围的项目。这是我为表达问题而编写的代码(为了清楚起见,我添加了注释):
def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, RDD[PerfLog])] =
{
durationLog.map((duration: PerfLog) => {
val sizes = sizeLogs.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
(duration, sizes)
})
}
如果我在函数末尾的 map 表达式上调用 .collect(),我会得到这个异常。
15/06/19 15:57:05 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
我发现,如果我修改上面的代码,以便在开始时收集两个参数并将函数的其余部分视为数组,它就可以正常运行。
def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : Array[(PerfLog, Array[PerfLog])] =
{
val durationData = durationLog.collect()
val sizeData = sizeLogs.collect()
durationData.map((duration: PerfLog) => {
val sizes = sizeData.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
(duration, sizes)
})
}
虽然这可行,但这显然不是正确的答案,因为参数可能会变得非常大。
为什么当它被当作一个数组而不是一个 RDD 时它可以工作?
【问题讨论】:
标签: scala apache-spark