【问题标题】:Apache Spark reduceByKey to sum decimalsApache Spark reduceByKey 对小数求和
【发布时间】:2015-12-03 19:07:18
【问题描述】:

我正在尝试这样映射 RDD(请参阅输出结果)并按十进制值映射 reduce,但我不断收到错误。当我尝试使用带有字数的 reduceByKey() 时,它工作得很好。十进制值的总和是否不同?

val voltageRDD= myRDD.map(i=> i.split(";"))
   .filter(i=> i(0).split("/")(2)=="2008")
   .map(i=> (i(0).split("/")(2),i(2).toFloat)).take(5)

输出:

voltageRDD: Array[(String, Float)] = Array((2008,1.62), (2008,1.626), (2008,1.622), (2008,1.612), (2008,1.612))

尝试减少时:

val voltageRDD= myRDD.map(i=> i.split(";"))
   .filter(i=> i(0).split("/")(2)=="2008")
   .map(i=> (i(0).split("/")(2),i(2).toFloat)).reduceByKey(_+_).take(5)

我收到以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2954.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2954.0 (TID 15696, 10.19.240.54): java.lang.NumberFormatException: For input string: "?"

【问题讨论】:

  • 您确定您的所有数据都是这种格式吗?在您的代码中,第二个 sn-p 将运行整个数据集(因为它将执行 reduceByKey 然后 take),而第一个将仅针对前几条记录运行。如果在第一个 5 记录之后的某个地方(过滤后)你的 i(2)? 它会在第二个 sn-p 但不会在第一个 sn-p 中崩溃,因为 spark 是惰性的。
  • @MateuszDymczyk 感谢您指出这一点。我的数据集不干净,因此格式不一样!
  • 在这种情况下,我将其添加为答案

标签: scala apache-spark apache-spark-sql


【解决方案1】:

如果您的数据包含无法解析为浮点数的列,那么您应该预先过滤掉它们或相应地处理它们。如果您看到不可解析的条目,这样的处理可能意味着您分配一个值0.0f。下面的代码正是这样做的。

val voltageRDD= myRDD.map(i=> i.split(";"))
  .filter(i => i(0).split("/")(2)=="2008")
  .map(i => (i(0).split("/")(2), Try{ i(2).toFloat }.toOption.getOrElse(0.0f)))
  .reduceByKey(_ + _).take(5)

【讨论】:

    【解决方案2】:

    短版:您可能有一行i(2) 等于?

    根据我的评论,您的数据很可能不一致,这在第一个 sn-p 中不会成为问题,因为 take(5) 并且没有需要 spark 对整个数据集执行操作的操作。 Spark 是惰性的,因此只会在从 map -> filter -> map 链获得 5 结果之前执行计算。

    另一方面,第二个 sn-p 将对您的整个数据集执行计算,因此它可以执行reduceByKey,然后才需要 5 个结果,因此它可能会发现您的数据集中太远的问题第一个sn-p。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-03-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-09-15
      • 2016-02-16
      • 2019-03-05
      相关资源
      最近更新 更多