【发布时间】:2018-03-08 00:38:05
【问题描述】:
我有一个带有 flink kafka 消费者的流(kafka msgs 正在流式传输到一个主题),我注意到我正在寻找解决一个有趣的行为。
当数据流入时,如果它在一个窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。
示例流程:
env.addSource(kafkaConsumer)
.flatMap(new TokenMapper())
.keyBy("word")
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
.reduce(new CountTokens())
.flatMap(new ConvertToString())
.addSink(producer);
我正在使用 FlinkKafkaConsumer010,并将 env TimeCharacteristic 设置为 EventTime。和 consumer.assignTimestampsAndWatermarks(new PeriodicWatermarks())
private static class PeriodicWatermarks implements AssignerWithPeriodicWatermarks<String>{
private long currentMaxTimestamp;
private final long maxOutOfOrderness;
public PeriodicWatermarksAuto(long maxOutOfOrderness){
this.maxOutOfOrderness = maxOutOfOrderness;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(String t, long l) {
// this should be the event timestamp
currentMaxTimestamp = l;
logger.info("TIMESTAMP: " + l);
return l;
}
}
如果我的窗口是 10 秒,而我的数据流仅包含 8 秒的数据(然后停止流式传输一段时间),则 flatMap->sink 不会处理,直到新的后续数据流入。
示例数据流处理问题:(每个 x 是每秒一条数据 )
xxxxxxxx(8secs)------(gap)--(later more data)xxxxx
^(not processed) (until I get here)^
类似地,例如,如果我有 35 秒的流数据(我的窗口也是 10 秒)只有 3 个窗口的数据触发,其余 5 秒的数据从不处理。
...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx
(processed) ^(not processed) (until I get here)^
最后,如果我的窗口是 10 秒,而我只有有 5 秒的流数据,那么 flatmap->sink 永远不会发生。
我的问题是,如果我们在一段时间后看不到数据,有没有办法触发窗口数据处理?
如果我的数据正在实时流式传输,我可以看到有一段无数据,并且不希望最后一个窗口(比如说只有 5 秒的数据)不得不等待一些不确定的时间,直到新数据进来了,我想要窗口时间过去后最后一个窗口的结果。
大声思考,这似乎是由于使用 EventTime 而不是 ProcessingTime,或者,我的水印没有正确生成以使最后一个窗口实际触发......不确定可能两者兼而有之?如果您的流结束最后一位未触发,我认为这对任何人来说都是一个问题。我会说我可能会发送一个流结束消息,但是如果由于源中断上游而导致流结束,这将无济于事。
编辑:所以我更改为处理时间,它确实正确处理了最后一个窗口中的数据,所以我猜 EventTime 毕竟是罪魁祸首,我认为自定义触发器或正确的窗口水印可能是答案......
感谢您的帮助!
【问题讨论】:
标签: apache-kafka apache-flink flink-streaming