【发布时间】:2022-01-05 18:42:36
【问题描述】:
当您从 Apache Kafka 提取数据时,我无法弄清楚水印应该如何工作。
- 我读到 Flink 通过从消息中获取时间戳来自动处理水印,但他们没有具体说明从哪里开始。来自消息有效负载、来自标头还是来自 CreateTime?
- 我尝试从有效负载中提取以毫秒为单位的 unix 时间戳,将其放入标头中,将其设置为 CreateTime,什么都没有。水印不会前进,因此不会触发事件时间窗口。
事件格式:
hello,1641369936000
hello,1641369937000
hello,1641369938000
hello,1641369939000
...
话题kafka-topics --bootstrap-server localhost:9092 --topic testerino --partitions 1 --replication-factor 1 --create
Kafka 版本 3.0.0,Flink 1.14.2
提前致谢
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setGroupId("test-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setTopics("testerino")
.setDeserializer(new KafkaRecordDeserializationSchema<String>() {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> out) throws IOException {
System.out.println(record);
out.collect(new String(record.value()));
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
})
.build();
DataStreamSource<String> streamSource = env.fromSource(
stringKafkaSource,
WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
return Long.parseLong(event.split(",")[1]);
}),
"source"
);
streamSource
.keyBy(k -> k.split(",")[0])
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(new Trigger<String, TimeWindow>() {
@Override
public TriggerResult onElement(String element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.printf("added elem: %s | timestamp: %s | window: %s| watermark: %d%n",
element, timestamp, window, ctx.getCurrentWatermark());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("proccesing time trigger");
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("event time trigger");
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("clear");
}
})
.process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
System.out.println("watermark: " + context.currentWatermark());
out.collect(s);
}
});
【问题讨论】: