【问题标题】:Kafka connect consumer referencing offset and storing in messageKafka连接消费者引用偏移量并存储在消息中
【发布时间】:2020-08-28 15:05:02
【问题描述】:

如果我使用 kafka-connect 来消费消息并存储到 s3(使用 kafka-connect s3 连接器),我是否可以将消息偏移量与事件有效负载一起存储?我想要这些数据来对消息进行排序,并检查是否可能存在任何空白或检查我收到的消息中是否有任何重复。 (例如,如果我的消费者偏移量被意外破坏并且我重新启动了 kafka-connect)。这是可能的还是我应该为这种类型的功能编写一个自定义订阅者?

【问题讨论】:

    标签: apache-kafka apache-kafka-connect


    【解决方案1】:

    根据Insert Field 转换的文档,您可以使用offset.field

    Name            Description
    offset.field    Field name for Apache Kafka® offset. This is only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default).
    

    总体而言,您的单消息转换 (SMT) 配置如下所示:

    "transforms": "InsertField",
    "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertField.offset.field": "offsetColumn"
    

    如果这不是您正在寻找的,那么始终可以选择创建您的 customised 转换

    【讨论】:

    • 我在寻找类似的东西,这个例子对我很有帮助。我有一个关于!为强制性的。会是:“!transforms.InsertField.offset.field”:“offsetColumn”
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-03-05
    • 2019-06-08
    • 2019-06-10
    • 2019-05-01
    • 2018-02-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多