【问题标题】:How does Apache Flink generate watermarks when you ingest data from Apache Kafka?从 Apache Kafka 摄取数据时,Apache Flink 如何生成水印?
【发布时间】:2022-01-05 18:42:36
【问题描述】:

当您从 Apache Kafka 提取数据时,我无法弄清楚水印应该如何工作。

  1. 我读到 Flink 通过从消息中获取时间戳来自动处理水印,但他们没有具体说明从哪里开始。来自消息有效负载、来自标头还是来自 CreateTime?
  2. 我尝试从有效负载中提取以毫秒为单位的 unix 时间戳,将其放入标头中,将其设置为 CreateTime,什么都没有。水印不会前进,因此不会触发事件时间窗口。

事件格式:

hello,1641369936000
hello,1641369937000
hello,1641369938000
hello,1641369939000
...

话题kafka-topics --bootstrap-server localhost:9092 --topic testerino --partitions 1 --replication-factor 1 --create

Kafka 版本 3.0.0,Flink 1.14.2

提前致谢

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder()
        .setBootstrapServers("localhost:9092")
        .setGroupId("test-group")
        .setStartingOffsets(OffsetsInitializer.latest())
        .setTopics("testerino")
        .setDeserializer(new KafkaRecordDeserializationSchema<String>() {
            @Override
            public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> out) throws IOException {
                System.out.println(record);
                out.collect(new String(record.value()));
            }

            @Override
            public TypeInformation<String> getProducedType() {
                return TypeInformation.of(String.class);
            }
        })
        .build();

DataStreamSource<String> streamSource = env.fromSource(
        stringKafkaSource,
        WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
            return Long.parseLong(event.split(",")[1]);
        }),
        "source"
);

streamSource
        .keyBy(k -> k.split(",")[0])
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .trigger(new Trigger<String, TimeWindow>() {
            @Override
            public TriggerResult onElement(String element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.printf("added elem: %s | timestamp: %s | window: %s| watermark: %d%n",
                        element, timestamp, window, ctx.getCurrentWatermark());
                return TriggerResult.CONTINUE;
            }

            @Override
            public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.println("proccesing time trigger");
                return TriggerResult.FIRE_AND_PURGE;
            }

            @Override
            public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.println("event time trigger");
                return TriggerResult.FIRE_AND_PURGE;
            }

            @Override
            public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.println("clear");
            }
        })
        .process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
            @Override
            public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                System.out.println("watermark: " + context.currentWatermark());
                out.collect(s);
            }
        });

【问题讨论】:

    标签: apache-kafka apache-flink


    【解决方案1】:

    Flink 从不自动提供水印,但 KafkaSource 确实从 Kafka 标头中获取时间戳,并使用它来设置它生成的 StreamRecords 的时间戳。这是传递给您的时间戳分配器的时间戳。

    我相信https://stackoverflow.com/a/70101290/2000823 解释了为什么您没有得到任何结果。

    【讨论】:

      猜你喜欢
      • 2017-01-23
      • 1970-01-01
      • 2017-05-06
      • 2018-05-09
      • 2018-08-11
      • 2021-03-09
      • 1970-01-01
      • 2018-10-31
      • 2018-02-01
      相关资源
      最近更新 更多