【问题标题】:Flink - How to use withTimestampAssigner getting time from Event Payload (without using Kafka timestamps)Flink - 如何使用 withTimestampAssigner 从事件有效负载中获取时间(不使用 Kafka 时间戳)
【发布时间】:2021-02-14 09:27:15
【问题描述】:

我试图了解如何在 Kafka Source 的 WatermarkStrategy 中使用 withTimestampAssigner()。我需要使用的“时间”在消息负载中。

为此,我有以下代码:

FlinkKafkaConsumer<Event> kafkaData =
        new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
kafkaData.assignTimestampsAndWatermarks(
        WatermarkStrategy
        .forMonotonousTimestamps()
                .withTimestampAssigner(Event, Event.time))

DataStream<Event> stream = env.addSource(kafkaData);

EventDeserializationSchema() 是这样的:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;
    
    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        
        return TypeInformation.of(Event.class);
    }
}

还有事件:

import java.io.Serializable;

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public String time;

    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }
}

我想了解的是如何为 withTimeStampAssigner() 提供 时间

.withTimestampAssigner(???))

变量应该是 Event.time 但从 flink 页面我不太明白。

我一直在寻找

这让我有点困惑,因为我不明白在我的情况下解决方案是否非常简单,或者我需要额外的上下文。我发现的所有示例都使用 .forBoundedOutOfOrderness() 或以前版本的 flink,其中实现与此不同:

kafka flink timestamp Event time and watermark

谢谢!

【问题讨论】:

    标签: apache-kafka flink-streaming


    【解决方案1】:

    如果源(例如,FlinkKafkaConsumer)没有提供您想要使用的时间戳,那么您需要提供一个TimestampAssigner。这是一个将事件和先前分配的时间戳(如果有)作为输入并返回时间戳的函数。在您的情况下,可能看起来像这样:

    FlinkKafkaConsumer<Event> kafkaData =
            new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
    
    WatermarkStrategy<Event> wmStrategy = 
            WatermarkStrategy
              .<Event>forMonotonousTimestamps()
              .withTimestampAssigner((event, timestamp) -> event.getTime());
    
    DataStream<Event> stream = env.addSource(
            kafkaData.assignTimestampsAndWatermarks(wmStrategy));
    

    (注意:这不太可行,因为您的 getTime() 方法返回一个字符串。您需要解析字符串并返回一个 long ——通常它是一个 long 表示自纪元以来的毫秒数。)

    涉及TimestampAssignerSupplier.ContextWatermarkGeneratorSupplier.Context 的情况适用于您需要访问较低级别的API 以执行更多自定义操作的情况。在这种情况下,这不是必需的。

    【讨论】:

    • 嗨,大卫,非常感谢您的回答。这个例子对我帮助很大。我缺少的唯一一点是时间指示特定事件发生的时间。当你说我需要解析字符串以返回一个 long 时,我明白了,我试图理解如何去做,但我不明白你写的部分:“通常它会是一个long 表示自纪元以来的毫秒数”。你能详细说明一下吗?我完全没有得到这最后一部分。谢谢!!
    • 我正在考虑您的答案,现在我明白了您的意思。基本上我需要用 UNIX 时间来喂 flink。 (currentmillis.com) 我发现以下非常有用:stackoverflow.com/questions/7784421/… 最后一点是 getTime() 是否需要除以 1000?
    • 很高兴你知道了。在大多数情况下,Flink 的事件时间是无单位的。但是一些辅助方法,例如用于设置窗口持续时间等。遵循时间单位为毫秒的约定,因此如果遵循该模式,您将拥有更轻松的时间。
    • 感谢大卫,您的帮助总是很宝贵! ;-)
    猜你喜欢
    • 2019-04-18
    • 2013-09-16
    • 1970-01-01
    • 1970-01-01
    • 2019-08-10
    • 2017-09-30
    • 2022-11-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多