【问题标题】:Rewrite Spark Java Application in Scala在 Scala 中重写 Spark Java 应用程序
【发布时间】:2016-04-04 13:51:49
【问题描述】:

我正在尝试将我用 Java 编写的 Spark 应用程序“转换”为 Scala。 因为我是 Scala 和 Spark 的 Scala API 的新手,所以我不知道如何在 Scala 中编写这个“transformToPair”函数:

Java:

JavaPairDStream<String, Boolean> outlierPairDStream = avgAll1h.union(avgPerPlug1h).transformToPair(findOutliersPerComparisonFunction);

*** FUNCTION ***

private static Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>> findOutliersPerComparisonFunction = new Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>>() {
    public JavaPairRDD<String, Boolean> call(JavaPairRDD<String, Float> v1) throws Exception {

        float avgOfAll;
        if(v1.count() > 0) {
            avgOfAll = v1.filter(new Function<Tuple2<String,Float>, Boolean>() {
                public Boolean call(Tuple2<String, Float> v1) throws Exception {
                    return v1._1().equals("all");
                }
            }).values().collect().get(0);
        } else {
            avgOfAll = 0.0f;
        }

        final float finalAvg = avgOfAll;

        JavaPairRDD<String, Boolean> rddBool = v1.mapValues(new Function<Float, Boolean>() {
            public Boolean call(Float v1) throws Exception {
                return v1 > finalAvg;
            }
        });


        return rddBool.filter(new Function<Tuple2<String,Boolean>, Boolean>() {
            public Boolean call(Tuple2<String, Boolean> v1) throws Exception {
                return !v1._1().equals("all");
            }
        });
    }
};

这是我对 Scala 的尝试:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
  var avgOfAll = 0.0

  if(rdd.count() > 0) {
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  }

  val finalAvg = avgOfAll

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  val rddNew = rddBool.filter({case(k, v) => (k != "all")})
}

我收到以下错误消息:

<console>:281: error: type mismatch;
 found   : Unit
 required: org.apache.spark.rdd.RDD[?]
       }
       ^

有人可以帮助我吗?如何返回“rddNew”DStream?

如果我说

return rddNew

在“转换”函数结束时,我收到以下错误:

<console>:293: error: return outside method definition
       return rddNew
       ^

【问题讨论】:

    标签: java scala apache-spark spark-streaming


    【解决方案1】:

    您必须实际返回最后一个值,例如像这样:

    val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
      var avgOfAll = 0.0
    
      if(rdd.count() > 0) {
        avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
      }
    
      val finalAvg = avgOfAll
    
      val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
    
      val rddNew = rddBool.filter({case(k, v) => (k != "all")})
    
      rddNew
    }
    

    或者干脆完全跳过定义变量:

    val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
      var avgOfAll = 0.0
    
      if(rdd.count() > 0) {
        avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
      }
    
      val finalAvg = avgOfAll
    
      val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
    
      rddBool.filter({case(k, v) => (k != "all")})
    }
    

    更像 Scala 的可能是:

    val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
    
      val finalAvg = if(rdd.count() > 0) {
        rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
      } else { 0.0 }
    
      val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
    
      rddBool.filter({case(k, v) => (k != "all")})
    }
    

    【讨论】:

    • 这是为我做的,我只是尝试使用不带 RETURN 关键字的“rddNew”行,现在它可以工作了!你拯救了我的一天,谢谢!
    猜你喜欢
    • 2021-02-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-21
    • 2018-07-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多