【问题标题】:Spark: NullPointerException when RDD isn't collected before mapSpark:在映射之前未收集 RDD 时出现 NullPointerException
【发布时间】:2015-06-20 06:37:56
【问题描述】:

我正在尝试在 Spark/Scala 中编写一个函数,该函数需要 2 个 RDD,第一个中的每个项目,从第二个中查找适合第一个日期范围的项目。这是我为表达问题而编写的代码(为了清楚起见,我添加了注释):

def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, RDD[PerfLog])] =
{
    durationLog.map((duration: PerfLog) => {
        val sizes = sizeLogs.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
        (duration, sizes)
    })
}

如果我在函数末尾的 map 表达式上调用 .collect(),我会得到这个异常。

15/06/19 15:57:05 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.NullPointerException
    at org.apache.spark.rdd.RDD.filter(RDD.scala:282)

我发现,如果我修改上面的代码,以便在开始时收集两个参数并将函数的其余部分视为数组,它就可以正常运行。

def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : Array[(PerfLog, Array[PerfLog])] =
{
    val durationData = durationLog.collect()
    val sizeData = sizeLogs.collect()

    durationData.map((duration: PerfLog) => {
        val sizes = sizeData.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
        (duration, sizes)
    })
}

虽然这可行,但这显然不是正确的答案,因为参数可能会变得非常大。

为什么当它被当作一个数组而不是一个 RDD 时它可以工作?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    您不能在迭代一个 RDD 时迭代其他 RDD。为了克服这个问题,你不需要收集两个 RDD,一个更好的解决方案是收集一个 RDD(较小的,以获得更好的性能),然后使用这两个数据结构(RDD 和数组)来获得你的 n^2 操作.

    def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, Array[PerfLog])] =
    {
       val sizeData = sizeLogs.collect
    
       durationLog.map((duration: PerfLog) => {
        val sizes = sizeData.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
         (duration, sizes)
       })
    }
    

    为了获得更好的性能,请使用 Spark Broadcast。它实际上将变量广播到所有节点。作为

    def buildRelationShip(sizeLogs: RDD[PerfLog], durationLog : RDD[PerfLog]) : RDD[(PerfLog, Array[PerfLog])] =
    {
       val sizeData = sc.broadcast(sizeLogs.collect)
    
       durationLog.map((duration: PerfLog) => {
        val sizes = sizeData.value.filter((size: PerfLog) => size.StartTime >= duration.StartTime && size.EndTime <= duration.EndTime)
         (duration, sizes)
       })
    }
    

    希望对你有所帮助。

    【讨论】:

      【解决方案2】:

      您不能将一个 RDD 放入另一个 RDD。 RDD 只是日期巫婆只能在驱动程序上使用的指针。另一方面,地图是在工人身上执行的。

      【讨论】:

        猜你喜欢
        • 2016-12-23
        • 1970-01-01
        • 1970-01-01
        • 2019-09-10
        • 1970-01-01
        • 1970-01-01
        • 2021-03-14
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多