【发布时间】:2018-09-30 14:53:50
【问题描述】:
重启一个 Kafka Connect S3 sink 任务后,它从主题的开头一直重启写入,并写入了旧记录的重复副本。换句话说,Kafka Connect 似乎失去了它的位置。
所以,我想 Kafka Connect 将当前偏移位置信息存储在内部 connect-offsets 主题中。该主题是空的,我认为这是问题的一部分。
另外两个内部主题connect-statuses 和connect-configs 不为空。 connect-statuses 有 52 个条目。 connect-configs 有 6 个条目;我配置的两个接收器连接器各三个:connector-<name>、task-<name>-0、commit-<name>。
在运行此之前,我手动创建了文档中指定的内部 Kafka Connect 主题:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
我可以验证connect-offsets 主题似乎创建正确:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
<snip>
这是一个运行 Kafka 10.2.1 的 Confluent Platform v3.2.1 的三服务器集群。
connect-offsets 应该是空的吗?为什么重启任务时Kafka Connect会在主题开头重启?
更新:回应 Randall Hauch 的回答。
- 关于源连接器偏移与接收连接器偏移的说明解释为空
connect-offsets。感谢您的解释! - 我绝对不会更改连接器名称。
- 如果连接器停机约五天后重新启动,连接器偏移位置是否会过期并重置?我看到
__consumer_offsets有cleanup.policy=compact -
auto.offset.reset应该只有在__consumer_offsets没有位置的情况下才会生效吧?
我主要使用系统默认值。我的接收器配置 JSON 如下。我正在使用一个非常简单的自定义分区器来分区 Avro 日期时间字段而不是挂钟时间。该功能似乎已添加到 Confluent v3.2.2 中,因此我不需要该功能的自定义插件。我希望跳过 Confluent v3.2.2 并在可用时直接进入 v3.3.0。
{
"name": "my-s3-sink",
"tasks.max": 1,
"topics": "my-topic",
"flush.size": 10000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",
"s3.bucket.name": "my-bucket",
"s3.region": "us-west-2",
"partition.field.name": "timestamp",
"locale": "us",
"timezone": "UTC",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"schema.compatibility": "NONE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}
【问题讨论】:
标签: amazon-s3 apache-kafka apache-kafka-connect confluent-platform