【发布时间】:2017-04-19 23:29:28
【问题描述】:
我尝试使用 Spark Streaming,并希望拥有一个可以在处理每个批次后更新的全局状态对象。据我所知,我至少有两种选择:
1. 使用mapWithState,其中Spark会在每批处理完毕后自动更新状态
2.使用updateStateByKey函数,这里我必须自己调用更新
对我来说,这两种情况都很好,但我两次都遇到同样的错误。这是我的两种情况及其错误的示例代码:
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one,
State<Integer> state) {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));;
Tuple2<String, Long> output = new Tuple2<>(word, sum);
state.update(sum);
return new String("Test");
}
});
这个取自 Spark 本身提供的示例。遗憾的是,我收到关于StateSpec.function(...) 的以下错误:
StateSpec类型中的方法function(Function3
,State ,MappedType>)不适用于参数(Function3 ,State ,Tuple2 >)
使用:
JavaPairDStream<String, Long> runningCounts = processed.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
public Optional<Long> call(List<Long> values, Optional<Long> state) {
Long newSum = state.orElse((long)0);
for(long x : values){
newSum +=x;
}
return Optional.of(newSum);
});
导致类似的错误:
JavaPairDStream
类型中的方法 updateStateByKey(Function2 ,Optional
,Optional>) 不适用于参数(new Function2,可选>(){})
我的导入快照是:
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;
我希望有人能帮我找出我的错误。
【问题讨论】:
-
你能写下你的进口声明吗?看起来您从错误的包中导入了 Function2 和 Function3
-
感谢您的信息,我应该添加这些。可悲的是,它们是从正确的包中导入的(我认为),就像我添加了其他功能一样。
-
您是否已将 Spark-Streaming 添加到您的项目中?马文之类的?我写了一些小应用程序,和你的映射功能完全一样,没关系
-
我发现了错误......你让我走上了错误的轨道。原来,我阅读的示例是针对 Spark 2.0.2,而我使用的是 1.6.1。在两者之间,他们改变了可选对象的导入位置。我从错误的包中导入可选。更正了,现在一切正常。感谢您的提示!我现在如何在我的问题中正确标记?
标签: java apache-spark spark-streaming