【问题标题】:Confluent InfluxDB sink connectorConfluent InfluxDB 接收器连接器
【发布时间】:2019-08-08 21:15:13
【问题描述】:

我正在尝试使用 Confluent InfluxDB sink connector 将主题中的数据获取到我的 InfluxDB 中。配置如下:

connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=https://mydb
topics=mytopic
tasks.max=1

当我通过 Kafka Connect UI 创建新连接器时,我得到的只是以下异常:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct
    at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    ... 10 more

主题中的值是这样的 json 字符串:{"pid":1,"filename":"test1.csv"}。这里有什么我遗漏的配置吗?

更新:这是我的工作人员配置:

config.storage.topic=kafka-connect-my-config
rest.port=28082
group.id=kafka-connect-mygroup
plugin.path=/usr/share/java,/connect-plugins
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.topic=kafka-connect-my-offsets
bootstrap.servers={my broker urls}
value.converter=org.apache.kafka.connect.storage.StringConverter
status.storage.topic=kafka-connect-my-status
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.advertised.host.name=kafka-development-kafka-connect-1
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter

【问题讨论】:

  • 你也可以分享你的工人properties文件

标签: apache-kafka-connect confluent-platform


【解决方案1】:

InfluxDB 连接器要求数据中存在模式,因此如果您有 JSON 数据,则需要设置

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

但是您的 JSON 需要包含架构,所以不是

{"pid":1,"filename":"test1.csv"}

你需要类似的东西

{
    "schema": {
        "type": "struct", "optional": false, "version": 1, "fields": [
            { "field": "pid", "type": "string", "optional": true },
            { "field": "filename", "type": "string", "optional": true }
        ]
    },
    "payload": {
        "pid": 1,
        "filename": "test1.csv"
    }
}

参考:https://rmoff.net/2020/01/23/notes-on-getting-data-into-influxdb-from-kafka-with-kafka-connect/

有关如何将架构应用于数据的更多详细信息,请参阅this blog

有关一般转换器的更多详细信息,请参阅this article

【讨论】:

  • 谢谢,不知道您可以为每个连接器配置转换器。我使用您提供的属性创建了一个新配置。 UI 中有一个警告:Warning: Config "value.converter.schemas.enable" is unknown,现在堆栈跟踪显示Caused by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
  • 您使用的是什么版本的 Confluent Platform?
  • 抱歉回复晚了。我们使用 5.1.2
  • @user6845507 你解决问题了吗?如果有,解决办法是什么?
猜你喜欢
  • 1970-01-01
  • 2021-08-08
  • 2021-08-30
  • 2021-08-14
  • 2021-08-25
  • 1970-01-01
  • 2019-01-24
  • 2021-01-05
  • 2021-10-19
相关资源
最近更新 更多