【发布时间】:2019-01-21 00:50:38
【问题描述】:
我正在尝试使用 Kafka 连接接收器将文件从 Kafka 写入 HDFS。
我的属性如下:
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
flush.size=3
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
schema.compatability=BACKWARD
key.converter.schemas.enabled=false
value.converter.schemas.enabled=false
schemas.enable=false
当我尝试运行连接器时,出现以下异常:
org.apache.kafka.connect.errors.DataException:带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”字段,并且可能不包含其他字段。如果您尝试反序列化纯 JSON 数据,请在转换器配置中设置 schemas.enable=false。
我使用的是 Confluent 4.0.0 版。
有什么建议吗?
【问题讨论】:
-
@cricket_007,如果我的 Json 没有“模式”和“有效负载”,我该如何编写 Parquet 文件?
-
我不认为你可以。 Parquet 需要 Schema,上次我检查时,来自 Confluent 的 Kafka Connect 代码使用 Avro 库帮助将 Kafka 消息转换为 Parquet 文件
-
您需要使用模式注册表将 Avro 生成到主题中。否则,您必须将架构字段添加到 JSON 消息。或者,使用 JSONFormat 而不是 Parquet,然后使用 Hive、Spark 以及稍后转换为 Parquet 的任何内容。在您选择的任何选项中,都需要定义架构,但这不是 Connect 框架中添加的属性
-
听起来你明白了。还有更多选项,例如使用 Kafka Streams 或 KSQL 将 JSON 主题转换为 Avro 主题,然后使用 Connect,但前提是您无法更改生产者代码并且能够可靠地部署这些服务
-
我没有尝试过,但这就是错误试图告诉你的内容
标签: apache-kafka hdfs apache-kafka-connect confluent-platform