【问题标题】:kafka connect hdfs sink connector is failingkafka connect hdfs sink连接器失败
【发布时间】:2019-01-21 00:50:38
【问题描述】:

我正在尝试使用 Kafka 连接接收器将文件从 Kafka 写入 HDFS。

我的属性如下:

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
flush.size=3
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
schema.compatability=BACKWARD
key.converter.schemas.enabled=false
value.converter.schemas.enabled=false
schemas.enable=false

当我尝试运行连接器时,出现以下异常:

org.apache.kafka.connect.errors.DataException:带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。

我使用的是 Confluent 4.0.0 版。

有什么建议吗?

【问题讨论】:

  • @cricket_007,如果我的 Json 没有“模式”和“有效负载”,我该如何编写 Parquet 文件?
  • 我不认为你可以。 Parquet 需要 Schema,上次我检查时,来自 Confluent 的 Kafka Connect 代码使用 Avro 库帮助将 Kafka 消息转换为 Parquet 文件
  • 您需要使用模式注册表将 Avro 生成到主题中。否则,您必须将架构字段添加到 JSON 消息。或者,使用 JSONFormat 而不是 Parquet,然后使用 Hive、Spark 以及稍后转换为 Parquet 的任何内容。在您选择的任何选项中,都需要定义架构,但这不是 Connect 框架中添加的属性
  • 听起来你明白了。还有更多选项,例如使用 Kafka Streams 或 KSQL 将 JSON 主题转换为 Avro 主题,然后使用 Connect,但前提是您无法更改生产者代码并且能够可靠地部署这些服务
  • 我没有尝试过,但这就是错误试图告诉你的内容

标签: apache-kafka hdfs apache-kafka-connect confluent-platform


【解决方案1】:

我对这个问题的理解是,如果你设置了 schemas.enable=true,就是告诉 kafka 你想将 schema 包含到 kafka 必须传输的消息中。在这种情况下,kafka 消息没有纯 json 格式。相反,它首先描述模式,然后附加与模式相对应的有效负载(即实际数据)(阅读有关 AVRO 格式的信息)。这会导致冲突:一方面您为数据指定了 JsonConverter,另一方面您要求 kafka 将架构包含到消息中。要解决此问题,您可以使用带有 schemas.enable = true 的 AvroConverter 或带有 schemas.enable=false 的 JsonCONverter。

【讨论】:

  • 我认为 AvroConverter 并不关心 schemas.enable 设置,因为它始终假定架构可用并且在架构注册表中
猜你喜欢
  • 2018-12-29
  • 2018-06-13
  • 2019-07-15
  • 1970-01-01
  • 2017-04-07
  • 2019-06-19
  • 2020-07-31
  • 2020-01-07
  • 2020-06-11
相关资源
最近更新 更多