【问题标题】:Flink Filter Stream based on another Stream DeterministicallyFlink Filter Stream Deterministically 基于另一个 Stream
【发布时间】:2020-06-17 14:06:07
【问题描述】:

我在 Flink 中有 2 个 DataStream(带有通用时间戳和来自 Kafka),其中一个包含一些信号值,另一个包含活动(简单的活动-非活动)信息。我用简单的状态private ValueState<Boolean> seen; 尝试了RichCoProcessFunction,结果是不确定的。如果我使用startFromEarliest 在同一组数据(具有相同的时间戳)上运行,我有时会过滤不同的值。我怎样才能使它具有确定性?我在下面分享我的KeyedCoProcessFunction 骨架。

private ValueState < Boolean > seen;

@Override
public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor < Boolean > descriptor = new ValueStateDescriptor < > (
        // state name
        "have-seen-key",
        // type information of state
        TypeInformation.of(new TypeHint < Boolean > () {}));
    seen = getRuntimeContext().getState(descriptor);
}

@Override
public void processElement1(SomeEvent < Double > value, Context ctx, Collector < SomeEvent < Double >> out) throws Exception {
    if (seen.value() == Boolean.TRUE) {
        out.collect(value);
    }
}

@Override
public void processElement2(SomeEvent < Double > value, Context ctx, Collector < SomeEvent < Double >> out) throws Exception {
    if (value.value == 1) {
        seen.update(Boolean.TRUE);

    } else {
        seen.update(Boolean.FALSE);
    }

}

【问题讨论】:

    标签: java stream apache-flink


    【解决方案1】:

    可以使用RichCoProcessFunction 来实现您想要的那种事件时间连接,但它可能有点复杂。您可能更愿意将其实现为 join with a temporal table function

    【讨论】:

      【解决方案2】:

      它不是确定性的原因是这两个来源以不同的速度产生元素。使其更具确定性的最简单方法是使用 EventTime。这意味着您需要为控制记录和数据记录分配时间戳。然后,Flink 将为您的元素发出水印。

      然后您可以简单地缓冲并等待发送或丢弃元素,直到您收到控制流的 Watermark,这意味着控制流中没有任何变化。

      如果没有时间戳,在这种情况下几乎不可能引入确定性行为,因为您永远无法准确判断给定记录何时到达以及哪些记录应该被删除以及哪些记录应该被发出。

      【讨论】:

      • 关于活动时间我同意,但我可能有一些遗漏的地方。我设置了TimeCharacteristic.EventTime 并使用了事件时间分配器,但我仍然有一个不确定的结果。我的 KeyedCoProcessFunction 需要哪些修改?
      • 正如我在帖子中所说,您需要模仿 TemporalTable 的行为,即您需要等待发出任何结果或丢弃它们,直到水印到达。然后您可以删除/发出时间戳低于水印的结果。或者,正如@David 在他的回答中所描述的那样,您可以使用 TemporalTable。
      猜你喜欢
      • 2017-03-05
      • 2018-01-07
      • 1970-01-01
      • 1970-01-01
      • 2018-08-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多