【问题标题】:Unable to use sink connector inside kafka connect无法在kafka connect中使用接收器连接器
【发布时间】:2021-04-05 23:12:18
【问题描述】:

我正在尝试在 kafka connect 中使用 S3 sink 连接器,它会启动并稍后失败。

我的配置如下:

{
    "name": "my-s3-sink3",
     "config": {
         "connector.class":"io.confluent.connect.s3.S3SinkConnector", 
         "tasks.max":"1", 
         "topics":"mysource.topic", 
         "s3.region":"us-east-1", 
         "s3.bucket.name": "topicbucket001", 
         "s3.part.size":"5242880", 
         "flush.size":"1", 
         "storage.class":"io.confluent.connect.s3.storage.S3Storage", 
         "format.class": "io.confluent.connect.s3.format.json.JsonFormat", 
         "partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner", 
         "schema.compatibility":"NONE"
        }
    }

我的 connect-distributed.properties 看起来像:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false
errors.tolerance = all

完整的错误日志:

[2021-04-06 10:59:04,398] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Member connector-consumer-s3connect12-0-f1e48df8-76ba-49f9-9080-e10b0a34202b sending LeaveGroup request to coordinator **********.kafka.us-east-1.amazonaws.com:9092 (id: 2147483645 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

2021-04-06 16:29:04
[2021-04-06 10:59:04,397] ERROR WorkerSinkTask{id=s3connect12-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

2021-04-06 16:29:04
[2021-04-06 10:59:04,396] ERROR WorkerSinkTask{id=s3connect12-0} Error converting message key in topic 'quickstart-status' partition 3 at offset 0 and timestamp 1617706740956: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask)

2021-04-06 16:29:04
[2021-04-06 10:59:04,393] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Resetting offset for partition quickstart-status-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[***************.kafka.us-east-1.amazonaws.com:9092 (id: 1 rack: use1-az2)], epoch=absent}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)

消息类型:

{
   "registertime": 1511985752912,
   "userid": "User_6",
   "regionid": "Region_8",
   "gender": "FEMALE"
}

新的错误日志:

【问题讨论】:

  • 数据是如何序列化的?您在连接器(或工作器)配置中使用了哪些键和值转换器?参考:confluent.io/blog/…
  • 你好@RobinMoffatt 用截图编辑了问题。它都是 json 。如何确定连接器中使用了哪些键和值转换器?
  • 我建议您在实际发布的连接器配置中设置转换器,而不仅仅是来自 docker(应该使用 env vars 完成,而不是编辑属性文件)

标签: apache-kafka apache-kafka-connect confluent-platform s3-kafka-connector


【解决方案1】:

问题在于 Key SerDe。根据您的屏幕截图,关键数据是非 JSON 字符串:

User_2
User_9
etc

所以不是

key.converter=org.apache.kafka.connect.json.JsonConverter

使用

key.converter=org.apache.kafka.connect.storage.StringConverter

编辑:

为您的连接器配置尝试这个,明确指定转换器(如@OneCricketeer 所建议)

{
    "name": "my-s3-sink3",
     "config": {
         "connector.class"               : "io.confluent.connect.s3.S3SinkConnector",
         "tasks.max"                     : "1",
         "topics"                        : "mysource.topic",
         "s3.region"                     : "us-east-1",
         "s3.bucket.name"                : "topicbucket001",
         "s3.part.size"                  : "5242880",
         "flush.size"                    : "1",
         "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
         "value.converter"               : "org.apache.kafka.connect.json.JsonConverter",
         "value.converter.schemas.enable": "false",
         "storage.class"                 : "io.confluent.connect.s3.storage.S3Storage",
         "format.class"                  : "io.confluent.connect.s3.format.json.JsonFormat",
         "partitioner.class"             : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
         "schema.compatibility"          : "NONE"
        }
    }

【讨论】:

  • 你好@Robin,我已经更新了 key.converter 以使用 StringConverter 但仍然得到同样的错误。 [2021-04-06 12:52:06,656] 错误 WorkerSinkTask{id=s3connect14-0} 在偏移量 0 和时间戳 1617706739466 的主题“快速启动状态”分区 2 中转换消息键时出错:将字节 [] 转换为 Kafka Connect 数据失败由于序列化错误:(org.apache.kafka.connect.runtime.WorkerSinkTask)。 key.converter.schema.enable 应该保持假对吗?
  • schema.enable 不适用,JsonConverter 除外。进行配置更改后,您是否重新启动了 Kafka Connect 工作器?
  • 在您引用的行之后是否有更多关于错误的详细信息?
  • 用所有日志更新了问题。我在 docker 内部使用 S3 sink 连接器和 kafka 连接,所以每次都在旋转 docker 容器
  • 我已经编辑了我的答案,将@OneCricketeer 的建议将转换器配置嵌入到连接器配置本身中
【解决方案2】:

所以我能够解决这个问题。在明确指定转换器后,我能够解决反序列化错误,然后遇到了 S3 Multipart Upload 的问题,通过将 S3 IAM 策略附加到 ECS 任务定义,向 S3 存储桶授予 Fargate 任务权限来解决该问题。 感谢 Robin Moffatt 提供上述解决方案!

【讨论】:

    猜你喜欢
    • 2020-04-04
    • 2019-06-17
    • 2022-11-24
    • 2018-02-06
    • 2019-09-28
    • 2020-05-29
    • 2020-05-07
    • 2019-02-23
    • 2021-08-28
    相关资源
    最近更新 更多