【问题标题】:Error while pushing JSON data from Kafka using Kafka connect to InfluxDB使用 Kafka 连接到 InfluxDB 从 Kafka 推送 JSON 数据时出错
【发布时间】:2020-03-10 05:51:57
【问题描述】:

我正在尝试使用 Kafka-connectJSON 数据从 Kafka 推送到 InfluxDB。 Kafka 版本 - 2.2.0,使用 Kafka-connect 与 apache-kafka 一起出现。

工人属性:

bootstrap.servers=localhost:9092
rest.port=8083
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets1
offset.storage.replication.factor=1
offset.storage.partitions=50
config.storage.topic=connect-configs1
config.storage.replication.factor=1
status.storage.topic=connect-status1
status.storage.replication.factor=1
status.storage.partitions=10
offset.storage.file.filename=/data/md0/kafka/data/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins
consumer.max.poll.records=10000

在启动 REST curl 命令和数据推送时,日志抛出以下异常:

[2019-11-14 09:41:33,294] ERROR WorkerSinkTask{id=FlinkAggsInfluxDBSink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:558)
java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
        at io.confluent.influxdb.sink.InfluxDBSinkTask.writeRecordsToDB(InfluxDBSinkTask.java:230)
        at io.confluent.influxdb.sink.InfluxDBSinkTask.put(InfluxDBSinkTask.java:112)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-11-14 09:41:33,295] ERROR WorkerSinkTask{id=FlinkAggsInfluxDBSink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
        at io.confluent.influxdb.sink.InfluxDBSinkTask.writeRecordsToDB(InfluxDBSinkTask.java:230)
        at io.confluent.influxdb.sink.InfluxDBSinkTask.put(InfluxDBSinkTask.java:112)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
        ... 10 more
[2019-11-14 09:41:33,295] ERROR WorkerSinkTask{id=FlinkAggsInfluxDBSink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

【问题讨论】:

    标签: json apache-kafka influxdb apache-kafka-connect confluent-platform


    【解决方案1】:

    事实上,错误表明它想要cast to org.apache.kafka.connect.data.Struct,那么您需要模式作为该数据的一部分,而不是纯 JSON。

    因此,您可以

    1. producer Avro 数据到topic,并使用AvroConverter
    2. 更新您的生产者以使用{"schema": ..., "payload": ... } 格式的JSON 和value.converter.schemas.enable=true
    3. 找到支持纯 JSON 的 another InfluxDB sink

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-05-27
      • 2023-01-24
      • 2020-09-01
      • 1970-01-01
      • 2020-04-27
      • 2017-02-07
      • 2017-02-08
      相关资源
      最近更新 更多