【问题标题】:Problems with the state functions in Spark StreamingSpark Streaming 中的状态函数问题
【发布时间】:2017-04-19 23:29:28
【问题描述】:

我尝试使用 Spark Streaming,并希望拥有一个可以在处理每个批次后更新的全局状态对象。据我所知,我至少有两种选择: 1. 使用mapWithState,其中Spark会在每批处理完毕后自动更新状态 2.使用updateStateByKey函数,这里我必须自己调用更新

对我来说,这两种情况都很好,但我两次都遇到同样的错误。这是我的两种情况及其错误的示例代码:

    Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
    new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
      @Override
      public Tuple2<String, Integer> call(String word, Optional<Integer> one,
          State<Integer> state) {
        int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
        Tuple2<String, Integer> output = new Tuple2<>(word, sum);
        state.update(sum);
        return output;
      }
    };


    JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
    wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));;
               Tuple2<String, Long> output = new Tuple2<>(word, sum);
               state.update(sum);
               return new String("Test");
        }
    });  

这个取自 Spark 本身提供的示例。遗憾的是,我收到关于StateSpec.function(...) 的以下错误:

StateSpec类型中的方法function(Function3,State,MappedType>)不适用于参数(Function3,State,Tuple2 >)

使用:

JavaPairDStream<String, Long> runningCounts = processed.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
            public Optional<Long> call(List<Long> values, Optional<Long> state) {
                Long newSum = state.orElse((long)0);
                for(long x : values){
                    newSum +=x;
                }
                return Optional.of(newSum);
              });

导致类似的错误:

JavaPairDStream 类型中的方法 updateStateByKey(Function2,Optional,Optional>) 不适用于参数(new Function2 ,可选>(){})

我的导入快照是:

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;

我希望有人能帮我找出我的错误。

【问题讨论】:

  • 你能写下你的进口声明吗?看起来您从错误的包中导入了 Function2 和 Function3
  • 感谢您的信息,我应该添加这些。可悲的是,它们是从正确的包中导入的(我认为),就像我添加了其他功能一样。
  • 您是否已将 Spark-Streaming 添加到您的项目中?马文之类的?我写了一些小应用程序,和你的映射功能完全一样,没关系
  • 我发现了错误......你让我走上了错误的轨道。原来,我阅读的示例是针对 Spark 2.0.2,而我使用的是 1.6.1。在两者之间,他们改变了可选对象的导入位置。我从错误的包中导入可选。更正了,现在一切正常。感谢您的提示!我现在如何在我的问题中正确标记?

标签: java apache-spark spark-streaming


【解决方案1】:

再补充一点,如果您使用的是最新的 Spark 2.3.0 版本,请使用以下包导入 Optional 以解决相同的问题。

Java 代码:

import org.apache.spark.api.java.Optional;

【讨论】:

    猜你喜欢
    • 2014-09-11
    • 1970-01-01
    • 2016-12-24
    • 2017-07-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-04
    相关资源
    最近更新 更多