【问题标题】:Kafka Connect error : java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException卡夫卡连接错误:java.util.concurrent.ExecutionException:org.apache.kafka.connect.runtime.rest.errors.BadRequestException
【发布时间】:2020-03-15 19:03:03
【问题描述】:

我正在尝试使用连接器连接 mysqlkafka

当我运行bin/connect-standalone.sh config/connect-standalone.properties test.config 时,出现错误。

[2019-11-20 06:02:05,219] 错误无法为 test.config 创建作业(org.apache.kafka.connect.cli.ConnectStandalone:110) [2019-11-20 06:02:05,219] 错误在连接器错误后停止 (org.apache.kafka.connect.cli.ConnectStandalone:121) java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: 连接器配置 {"config"={, "database.user"="root",, "database.port"= "3306",, "include.schema.changes"="true", "database.server.name"="asgard",, "connector.class"="io.debezium.connector.mysql.MySqlConnector",, " database.history.kafka.topic"="dbhistory.demo" ,, "database.server.id"="42",, "name"="mysql-source-demo-customers",, "database.hostname"= "localhost",, {=, "database.password"="dsm1234",, }=, "database.history.kafka.bootstrap.servers"="localhost:9092",, "table.whitelist"="demo.客户",} 不包含连接器类型 在 org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79) 在 org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66) 在 org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118) 引起:org.apache.kafka.connect.runtime.rest.errors.BadRequestException: 连接器配置 {"config"={, "database.user"="root",, "database.port"="3306",, "include.schema.changes"="true","database.server.name"="asgard","connector.class"="io.debezium.connector.mysql.MySqlConnector","database.history.kafka .topic"="dbhistory.demo" ,, "database.server.id"="42",, "name"="mysql-source-demo-customers",, "database.hostname"="localhost",, {=, "database.password"="dsm1234",, }=, "database.history.kafka.bootstrap.servers"="localhost:9092",, "table.whitelist"="demo.customers",} 包含无连接器类型 在 org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:287) 在 org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:192) 在 org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115) [2019-11-20 06:02:05,221] 信息卡夫卡连接停止(org.apache.kafka.connect.runtime.Connect:66) [2019-11-20 06:02:05,221] 信息停止 REST 服务器(org.apache.kafka.connect.runtime.rest.RestServer:241) [2019-11-20 06:02:05,224] 信息停止 http_8083@2a7686a7{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:341) [2019-11-20 06:02:05,225] INFO node0 停止清理(org.eclipse.jetty.server.session:167) [2019-11-20 06:02:05,226] INFO REST 服务器已停止(org.apache.kafka.connect.runtime.rest.RestServer:258) [2019-11-20 06:02:05,226] INFO Herder 停止 (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:98) [2019-11-20 06:02:05,226] INFO 工人停止(org.apache.kafka.connect.runtime.Worker:194) [2019-11-20 06:02:05,226] 信息停止 FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:66) [2019-11-20 06:02:05,226] INFO Worker 停止(org.apache.kafka.connect.runtime.Worker:215) [2019-11-20 06:02:05,227] INFO Herder 停止(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:115) [2019-11-20 06:02:05,227] INFO Kafka Connect 已停止(org.apache.kafka.connect.runtime.Connect:71)

这是我的test.config

{
  "name": "mysql-source-demo-customers",
  "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "localhost",
      "database.port": "3306",
      "database.user": "root",
      "database.password": "dsm1234",
      "database.server.id": "42",
      "database.server.name": "asgard",
      "table.whitelist": "demo.customers",
      "database.history.kafka.bootstrap.servers": "localhost:9092",
      "database.history.kafka.topic": "dbhistory.demo" ,
      "include.schema.changes": "true"
  }
}

这是我的connect-standalone.properties

bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

plugin.path=/home/ec2-user/share/confluent-hub-components

错误日志显示“不包含连接器类型”

我在 Stackoverflow 上发现了一个类似的问题并关注了它,但它对我不起作用或与我的案例无关。 (我又问了一个类似的问题,是关于plugin.path的)

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    当我像下面这样更改test.config 时,它可以工作。 (JSON 格式 -> 常规属性格式)

    test.config

    name=mysql-source-demo-customers
    tasks.max=1
    connector.class =io.debezium.connector.mysql.MySqlConnector
    database.hostname=localhost
    database.port = 3306
    database.user =root
    database.password= dsm1234
    database.server.id= 1234
    database.server.name =jin
    table.whitelist= demo.customers
    database.history.kafka.bootstrap.servers=localhost:9092
    database.history.kafka.topic= dbhistory.demo
    include.schema.changes =true
    

    我也尝试将test.config 命名为test.properties,但更改文件扩展名并没有影响任何事情。

    【讨论】:

      猜你喜欢
      • 2021-12-16
      • 2017-11-14
      • 1970-01-01
      • 2023-02-15
      • 2020-08-27
      • 2016-09-03
      • 2018-01-18
      • 2017-03-02
      相关资源
      最近更新 更多