【问题标题】:Failed to deserialize data for topic无法反序列化主题的数据
【发布时间】:2019-11-27 19:59:26
【问题描述】:

我从这里使用 confluent cp-all-in-one 项目配置:https://github.com/confluentinc/cp-docker-images/blob/5.2.2-post/examples/cp-all-in-one/docker-compose.yml

我正在向http://localhost:8082/topics/zuum-positions 发送一条消息 具有以下 AVRO 主体:

{  
   "key_schema": "{\"type\":\"string\"}",
   "value_schema":"{  \"type\":\"record\",\"name\":\"Position\",\"fields\":[  {  \"name\":\"loadId\",\"type\":\"double\"},{\"name\":\"lat\",\"type\":\"double\"},{  \"name\":\"lon\",\"type\":\"double\"}]}",
   "records":[  
      {  
         "key":"22",
         "value":{  
            "lat":43.33,
            "lon":43.33,
            "loadId":22
         }
      }
   ]
}

我已将以下标头正确添加到上述 POST 请求中: Content-Type: application/vnd.kafka.avro.v2+json Accept: application/vnd.kafka.v2+json

执行此请求时,我在 docker 日志中看到以下异常:

Error encountered in task zuum-sink-positions-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='zuum-positions', partition=0, offset=25, timestamp=1563480487456, timestampType=CreateTime}. org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic zuum-positions to Avro: 
connect            |    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect            |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect            |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect            |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect            |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect            |    at java.lang.Thread.run(Thread.java:748)
connect            | Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
connect            | Caused by: java.net.ConnectException: Connection refused (Connection refused)
connect            |    at java.net.PlainSocketImpl.socketConnect(Native Method)
connect            |    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
connect            |    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
connect            |    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
connect            |    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
connect            |    at java.net.Socket.connect(Socket.java:589)
connect            |    at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
connect            |    at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
connect            |    at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
connect            |    at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
connect            |    at sun.net.www.http.HttpClient.New(HttpClient.java:339)
connect            |    at sun.net.www.http.HttpClient.New(HttpClient.java:357)
connect            |    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
connect            |    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
connect            |    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
connect            |    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
connect            |    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
connect            |    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
connect            |    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
connect            |    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
connect            |    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
connect            |    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
connect            |    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
connect            |    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
connect            |    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
connect            |    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211)
connect            |    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
connect            |    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
connect            |    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
connect            |    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
connect            |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect            |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect            |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect            |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect            |    at java.lang.Thread.run(Thread.java:748)

我已经为此花费了几个小时,但找不到原因。通常,当connect 无法连接到模式注册表但我从这里保留了它们的配置时会发生此错误:https://github.com/confluentinc/cp-docker-images/blob/5.2.2-post/examples/cp-all-in-one/docker-compose.yml#L77

你能帮忙吗?

【问题讨论】:

  • 这是整个堆栈跟踪吗?
  • @cricket_007 刚刚更新了堆栈跟踪
  • @Robin Moffatt 能给个建议吗?

标签: docker apache-kafka confluent-platform confluent-schema-registry kafka-rest


【解决方案1】:

问题已解决。

基本上 kafka 消息已成功保存到主题,但是当我的 JDBC 接收器连接器尝试解析它并复制到我们的 MySQL 数据库时,它无法连接到架构注册表 URL。

以前的连接器配置:

{
  "name": "zuum-sink-positions",
  "key.converter.schema.registry.url": "http://localhost:8081", 
  "value.converter.schema.registry.url": "http://localhost:8081", 
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schemas.enable":"false",
  "value.converter.schemas.enable":"true",
  "config.action.reload": "restart",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "print.key": "true",
  "errors.tolerance": "all",
  "topics": "zuum-positions",
  "connection.url": "jdbc:mysql://ip:3306/zuum_tracking",
  "connection.user": "user",
  "connection.password": "password",
  "auto.create": "true"
}

使用正确的主机更新架构注册表 url:

{
  "name": "zuum-sink-positions",
  "key.converter.schema.registry.url": "http://schema-registry:8081", 
  "value.converter.schema.registry.url": "http://schema-registry:8081", 
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schemas.enable":"false",
  "value.converter.schemas.enable":"true",
  "config.action.reload": "restart",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "print.key": "true",
  "errors.tolerance": "all",
  "topics": "zuum-positions",
  "connection.url": "jdbc:mysql://ip:3306/zuum_tracking",
  "connection.user": "user",
  "connection.password": "password",
  "auto.create": "true"
}

【讨论】:

  • 在这里,您基本上更改了架构注册表 url。您能告诉我您的架构注册表属性中的 url 是什么吗?我也在尝试做同样的事情,但遇到了多个问题并最终到达这里。您能否在此处发布您的接收器和源配置以及作为您的源的数据库表。这是我的问题的链接 - stackoverflow.com/questions/67316149/…
  • 谢谢我已经花了几个小时来解决这个问题,解决我的问题的是这一行:“errors.tolerance”:“all”。验证了所有数据并且是好的并且错误消失了。
【解决方案2】:

假设问题是消息的关键,替代答案是更改 Connect 容器环境变量以更改

CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081

否则,如果问题只是值,那么如果您想覆盖在 Compose / connect-distributed.properties 文件中设置的默认值,则只需在 JSON 中指定转换器设置。换句话说,您可以完全删除 localhost 值。

【讨论】:

    【解决方案3】:

    Confluent Connect 不会重新尝试此错误,如果您的任务因此而失败,它们可能会从集群中删除。 此错误也可能出现在运行 Confluent 5.5.x 或更早版本的生产集群中,我昨晚遇到了,见下文 -


    Graceful stop of task failed (Executing stage VALUE_CONVERTER)
    [2022-01-09 01:11:27,890] ERROR Graceful stop of task xyz_s3-sink209_2222_MESSAGE-0 failed. (org.apache.kafka.connect.runtime.Worker:746)
    [2022-01-09 01:11:31,589] ERROR Error encountered in task xyz_s3-sink209_2222_MESSAGE-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='b0213_BC_MESSAGE', partition=0, offset=6007846, timestamp=1641664763873, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter:62)
    [2022-01-09 01:11:31,591] ERROR WorkerSinkTask{id=xyz_s3-sink209_2222_MESSAGE-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
    [2022-01-09 01:11:31,594] ERROR WorkerSinkTask{id=xyz_s3-sink209_2222_MESSAGE-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
    

    完整的错误可能如下所示 -

    {"id":0,"state":"FAILED","worker_id":"xyz.co.uk:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic xyz_2222_MESSAGE to Avro: \n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:118)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 32818\nCaused by: java.net.SocketTimeoutException: Read timed out\n\tat java.net.SocketInputStream.socketRead0(Native Method)\n\tat java.net.SocketInputStream.socketRead(SocketInputStream.java:116)\n\tat java.net.SocketInputStream.read(SocketInputStream.java:171)\n\tat java.net.SocketInputStream.read(SocketInputStream.java:141)\n\tat java.io.BufferedInputStream.fill(BufferedInputStream.java:246)\n\tat java.io.BufferedInputStream.read1(BufferedInputStream.java:286)\n\tat java.io.BufferedInputStream.read(BufferedInputStream.java:345)\n\tat sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)\n\tat sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)\n\tat sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)\n\tat sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)\n\tat java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:351)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:659)\n\tat io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:641)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)\n\tat io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:273)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.getSubject(AbstractKafkaAvroDeserializer.java:297)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:286)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:151)\n\tat io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:162)\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:101)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n"}
    

    如果 Schema Registry 实例未在负载平衡集群中的所有节点上运行,您可能会看到此错误。

    任何尝试从 Kafka 反序列化数据的 Connect 任务都会失败,直到 Schema Registry 健康且可访问。

    一旦验证 Kafka 代理和 Schema Registry 已备份且运行良好,则可以在集群中的非平衡连接节点上卸载并重新启动连接任务以解决此问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-25
      • 1970-01-01
      • 2020-05-24
      • 1970-01-01
      • 2011-05-06
      相关资源
      最近更新 更多