【发布时间】:2020-06-17 14:06:07
【问题描述】:
我在 Flink 中有 2 个 DataStream(带有通用时间戳和来自 Kafka),其中一个包含一些信号值,另一个包含活动(简单的活动-非活动)信息。我用简单的状态private ValueState<Boolean> seen; 尝试了RichCoProcessFunction,结果是不确定的。如果我使用startFromEarliest 在同一组数据(具有相同的时间戳)上运行,我有时会过滤不同的值。我怎样才能使它具有确定性?我在下面分享我的KeyedCoProcessFunction 骨架。
private ValueState < Boolean > seen;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor < Boolean > descriptor = new ValueStateDescriptor < > (
// state name
"have-seen-key",
// type information of state
TypeInformation.of(new TypeHint < Boolean > () {}));
seen = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement1(SomeEvent < Double > value, Context ctx, Collector < SomeEvent < Double >> out) throws Exception {
if (seen.value() == Boolean.TRUE) {
out.collect(value);
}
}
@Override
public void processElement2(SomeEvent < Double > value, Context ctx, Collector < SomeEvent < Double >> out) throws Exception {
if (value.value == 1) {
seen.update(Boolean.TRUE);
} else {
seen.update(Boolean.FALSE);
}
}
【问题讨论】:
标签: java stream apache-flink