【问题标题】:Kafka Connect Sink (GCS) only reading from latest offset, configure to read from earliest?Kafka Connect Sink(GCS)仅从最新偏移读取,配置为从最早读取?
【发布时间】:2019-12-13 11:13:28
【问题描述】:

如上所述,我目前正在设置一个 Kafka Connect Sink 以将数据从 Kafka 接收到 Google Cloud Storage。

然而,一切进展顺利 - 它只使用最新的可用偏移量。也就是说,一旦它开始运行,它只会将新生成的消息下沉到 GCS,而不是来自 Kafka 的现有消息。我已经尝试删除 kafka 连接存储/偏移主题,创建新的连接器名称等。但是,它总是从最新的偏移开始。

如果无论如何要为 Kafka Connect GCS Sink 配置最早的偏移量?我还没有看到任何配置来处理这个问题

https://docs.confluent.io/current/connect/kafka-connect-gcs/configuration_options.html

https://docs.confluent.io/current/connect/references/allconfigs.html

我已尝试删除任何 kafka 连接主题/文件存储,以及从新的连接器名称开始

我看到连接器启动后生成的 Kafka Connect 接收器消息。

我期望/需要消息从最早的可用偏移量下沉,即。如果没有为连接器提交偏移量,则从最早的消息开始

【问题讨论】:

    标签: apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    当您第一次创建连接器时,默认情况下会采用earliest 偏移量。您应该在 Connect 工作器日志中看到这一点:

    [2019-08-05 23:31:35,405] INFO ConsumerConfig values:
            allow.auto.create.topics = true
            auto.commit.interval.ms = 5000
            auto.offset.reset = earliest
    …
    

    您可以通过更改 Worker 配置来覆盖它:consumer.auto.offset.reset

    当您删除连接器并重新创建它时,偏移量将被保留并重复使用。

    如果您使用 new 名称创建连接器,默认情况下它将使用连接工作程序 (earliest) 中设置的偏移量。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-01-10
      • 2017-01-20
      • 2022-06-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多