【发布时间】:2016-07-01 19:35:21
【问题描述】:
我正在向 JavaPairDStream 发送 3 次相同的对象。我正在更新它的状态,但它保存了 3 次。打印 JavaPairDStream 可以确认这一点。
Function3<InputMessageKey, Optional<InputMessage>, State<InputMessage>, Tuple2<InputMessageKey, InputMessage>> mappingFunction = new Function3<InputMessageKey, Optional<InputMessage>, State<InputMessage>, Tuple2<InputMessageKey, InputMessage>>() {
@Override
public Tuple2<InputMessageKey, InputMessage> call(InputMessageKey key, Optional<InputMessage> value, State<InputMessage> state) {
InputMessage inputMessage = value.get();
Tuple2<InputMessageKey, InputMessage> output = new Tuple2<>(key, inputMessage);
state.update(inputMessage);
return output;
}
};
打印流:
(com.input.InputMessageKey@220593a0,com.input.InputMessage@781bfd72)
(com.input.InputMessageKey@220593a0,com.input.InputMessage@781bfd72)
(com.input.InputMessageKey@220593a0,com.input.InputMessage@781bfd72)
【问题讨论】:
标签: java apache-spark apache-kafka spark-streaming