【发布时间】:2019-03-21 06:45:03
【问题描述】:
由于map 转换中的函数抛出java.lang.NullPointerException,我的 Spark 作业的一个阶段失败。
我的想法是借助 Try 类型从 map 内部获取损坏的 Sale 对象。
所以我故意将函数结果分配给saleOption 变量,以便进行模式匹配。
不幸的是,我当前的实现不起作用,我需要有关如何修复它的建议。将不胜感激任何建议。
这里是初始方法:
def filterSales(rawSales: RDD[Sale]): RDD[(String, Sale)] = {
rawSales
.map(sale => sale.id -> sale) // throws NullPointerException
.reduceByKey((sale1, sale2) => if (sale1.timestamp > sale2.timestamp) sale1 else sale2)
}
这是我实现我的想法的方式:
def filterSales(rawSales: RDD[Sale]): RDD[(String, Sale)] = {
rawSales
.map(sale => {
val saleOption: Option[(String, Sale)] = Try(sale.id -> sale).toOption
saleOption match {
case Success(successSale) => successSale
case Failure(e) => throw new IllegalArgumentException(s"Corrupted sale: $rawSale;", e)
}
})
.reduceByKey((sale1, sale2) => if (sale1.timestamp > sale2.timestamp) sale1 else sale2)
}
UPD:我的目的是为了调试目的实现这个想法并提高我的 Scala 知识。我不会使用Try 和Exceptions 进行流量控制。
【问题讨论】:
-
1.您确定 rawSales RDD 的创建没有错误吗?也许你可以构建它,这样你就会有 rawSales: RDD[Option[Sale]]... 2. 你为什么抛出异常?你应该把它过滤掉。
-
@user3725190 实际上我应该提到我的目的是为了调试目的而编写代码。
标签: scala apache-spark exception exception-handling nullpointerexception