【发布时间】:2016-04-04 13:51:49
【问题描述】:
我正在尝试将我用 Java 编写的 Spark 应用程序“转换”为 Scala。 因为我是 Scala 和 Spark 的 Scala API 的新手,所以我不知道如何在 Scala 中编写这个“transformToPair”函数:
Java:
JavaPairDStream<String, Boolean> outlierPairDStream = avgAll1h.union(avgPerPlug1h).transformToPair(findOutliersPerComparisonFunction);
*** FUNCTION ***
private static Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>> findOutliersPerComparisonFunction = new Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>>() {
public JavaPairRDD<String, Boolean> call(JavaPairRDD<String, Float> v1) throws Exception {
float avgOfAll;
if(v1.count() > 0) {
avgOfAll = v1.filter(new Function<Tuple2<String,Float>, Boolean>() {
public Boolean call(Tuple2<String, Float> v1) throws Exception {
return v1._1().equals("all");
}
}).values().collect().get(0);
} else {
avgOfAll = 0.0f;
}
final float finalAvg = avgOfAll;
JavaPairRDD<String, Boolean> rddBool = v1.mapValues(new Function<Float, Boolean>() {
public Boolean call(Float v1) throws Exception {
return v1 > finalAvg;
}
});
return rddBool.filter(new Function<Tuple2<String,Boolean>, Boolean>() {
public Boolean call(Tuple2<String, Boolean> v1) throws Exception {
return !v1._1().equals("all");
}
});
}
};
这是我对 Scala 的尝试:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
var avgOfAll = 0.0
if(rdd.count() > 0) {
avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
}
val finalAvg = avgOfAll
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
val rddNew = rddBool.filter({case(k, v) => (k != "all")})
}
我收到以下错误消息:
<console>:281: error: type mismatch;
found : Unit
required: org.apache.spark.rdd.RDD[?]
}
^
有人可以帮助我吗?如何返回“rddNew”DStream?
如果我说
return rddNew
在“转换”函数结束时,我收到以下错误:
<console>:293: error: return outside method definition
return rddNew
^
【问题讨论】:
标签: java scala apache-spark spark-streaming