【问题标题】:Publishing an AVRO messages to Kafka via Kafka REST通过 Kafka REST 将 AVRO 消息发布到 Kafka
【发布时间】:2021-02-08 00:10:02
【问题描述】:

我们在 Kubernetes 集群中部署了 Confluent Platform 6.0。我通过 Kafka REST api 创建了一个 Kafka 主题“test-topic-1”。现在我正在尝试向该主题发布一条简单的 AVRO 消息。

curl --location --request POST 'https://kafka-rest-master.k8s.hip.com.au/topics/test-topic-1' \
--header 'Content-Type: application/vnd.kafka.avro.v2+json' \
--header 'Accept: application/vnd.kafka.v2+json' \
--data-raw '{"value_schema":{"type":"record","name":"User","fields":[{"name":"name","type":"string"}]},"records":[{"value":{"name":"testUser"}}]}'

我收到此请求的 500 错误响应,

{"error_code":500,"message":"Internal Server Error"}

当我检查kafka rest pod的日志时,我可以看到以下错误,

ERROR 请求失败并出现异常 (io.confluent.rest.exceptions.DebuggableExceptionMapper) com.fasterxml.jackson.databind.exc.MismatchedInputException:不能 从 START_OBJECT 令牌中反序列化 java.lang.String 的实例 在 [来源: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); 行:1,列:17](通过参考链: io.confluent.kafkarest.entities.v2.SchemaTopicProduceRequest["value_schema"]) 在 com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) 在 com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1445)

我是否按照正确的步骤将 AVRO 消息发布到新创建的 Kafka 主题?如果是这样,这可能是什么问题?

【问题讨论】:

标签: apache-kafka avro confluent-platform kafka-rest


【解决方案1】:

您的 AVRO 架构定义错误。应该是这样定义的

{
  "type": "record",
  "name": "recordName",
  "namespace": "namespace",
  "doc": "description",
  "fields": [
    {
      "name": "key",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      }
    }
  ]
}

"fields": [
  {
    "name": "fiedName",
    "type": "string"
  },
]

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-05-15
    • 2020-08-26
    • 1970-01-01
    • 2016-08-28
    • 2016-11-10
    • 2018-10-28
    • 2018-10-27
    相关资源
    最近更新 更多