【问题标题】:Restarting Kafka Connect S3 Sink Task Loses Position, Completely Rewrites everything重启 Kafka Connect S3 Sink 任务丢失位置,完全重写一切
【发布时间】:2018-09-30 14:53:50
【问题描述】:

重启一个 Kafka Connect S3 sink 任务后,它从主题的开头一直重启写入,并写入了旧记录的重复副本。换句话说,Kafka Connect 似乎失去了它的位置。

所以,我想 Kafka Connect 将当前偏移位置信息存储在内部 connect-offsets 主题中。该主题是空的,我认为这是问题的一部分。

另外两个内部主题connect-statusesconnect-configs 不为空。 connect-statuses 有 52 个条目。 connect-configs 有 6 个条目;我配置的两个接收器连接器各三个:connector-<name>task-<name>-0commit-<name>

在运行此之前,我手动创建了文档中指定的内部 Kafka Connect 主题:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact

我可以验证connect-offsets 主题似乎创建正确:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets   PartitionCount:50   ReplicationFactor:3 Configs:cleanup.policy=compact
    Topic: connect-offsets  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
    Topic: connect-offsets  Partition: 1    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: connect-offsets  Partition: 2    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
  <snip>

这是一个运行 Kafka 10.2.1 的 Confluent Platform v3.2.1 的三服务器集群。

connect-offsets 应该是空的吗?为什么重启任务时Kafka Connect会在主题开头重启?

更新:回应 Randall Hauch 的回答。

  • 关于源连接器偏移与接收连接器偏移的说明解释为空connect-offsets。感谢您的解释!
  • 我绝对不会更改连接器名称。
  • 如果连接器停机约五天后重新启动,连接器偏移位置是否会过期并重置?我看到__consumer_offsetscleanup.policy=compact
  • auto.offset.reset应该只有在__consumer_offsets没有位置的情况下才会生效吧?

我主要使用系统默认值。我的接收器配置 JSON 如下。我正在使用一个非常简单的自定义分区器来分区 Avro 日期时间字段而不是挂钟时间。该功能似乎已添加到 Confluent v3.2.2 中,因此我不需要该功能的自定义插件。我希望跳过 Confluent v3.2.2 并在可用时直接进入 v3.3.0。

{
  "name": "my-s3-sink",

  "tasks.max": 1,
  "topics": "my-topic",
  "flush.size": 10000,

  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
  "partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",

  "s3.bucket.name": "my-bucket",
  "s3.region": "us-west-2",

  "partition.field.name": "timestamp",

  "locale": "us",
  "timezone": "UTC",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",

  "schema.compatibility": "NONE",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}

【问题讨论】:

    标签: amazon-s3 apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    Kafka 消费者的默认偏移保留期为 24 小时(1440 分钟)。如果您停止连接器并因此在超过 24 小时内没有进行新的提交,您的偏移量将过期,并且您将在重新启动时作为新的使用者重新开始。您可以使用offsets.retention.minutes 参数修改__consumer_offsets 主题的保留期

    【讨论】:

    • 非常感谢!
    【解决方案2】:

    Kafka Connect 使用connect-offsets 主题(或任何您命名的主题)来存储源连接器 的偏移量,但接收器连接器偏移量是使用普通的 Kafka 消费者组机制存储的。

    连接器可能重新开始的一个原因是连接器名称是否更改。连接器名称用于定义消费者组的名称,因此如果您更改连接器的名称,则重新启动连接器将使用不同的消费者组,其消费者将从头开始。

    另一个原因可能是 Kafka Connect 消费者被配置为每次都从头开始,通过consumer.auto.offset.reset=earliest

    S3 连接器版本 3.3.0(即将推出)修复了多个问题,其中一些问题会影响按时轮换或架构的工作方式。你没有提供你的配置,所以很难说这些是否会导致你看到的行为。

    【讨论】:

    • 惊人的答案。谢谢!我在主要问题中发布了更多详细信息和后续问题。
    • 哇,你节省了我的一天,惊人的答案,我们可以专门为 kafka 连接中的消费者设置 consumer.auto.offset.reset=earliest 吗?
    猜你喜欢
    • 2018-10-20
    • 1970-01-01
    • 2020-02-27
    • 2018-06-16
    • 1970-01-01
    • 2021-10-16
    • 2020-06-15
    • 2021-12-08
    • 2016-02-21
    相关资源
    最近更新 更多