【发布时间】:2020-03-15 19:03:03
【问题描述】:
我正在尝试使用连接器连接 mysql 和 kafka。
当我运行bin/connect-standalone.sh config/connect-standalone.properties test.config 时,出现错误。
[2019-11-20 06:02:05,219] 错误无法为 test.config 创建作业(org.apache.kafka.connect.cli.ConnectStandalone:110) [2019-11-20 06:02:05,219] 错误在连接器错误后停止 (org.apache.kafka.connect.cli.ConnectStandalone:121) java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: 连接器配置 {"config"={, "database.user"="root",, "database.port"= "3306",, "include.schema.changes"="true", "database.server.name"="asgard",, "connector.class"="io.debezium.connector.mysql.MySqlConnector",, " database.history.kafka.topic"="dbhistory.demo" ,, "database.server.id"="42",, "name"="mysql-source-demo-customers",, "database.hostname"= "localhost",, {=, "database.password"="dsm1234",, }=, "database.history.kafka.bootstrap.servers"="localhost:9092",, "table.whitelist"="demo.客户",} 不包含连接器类型 在 org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79) 在 org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66) 在 org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118) 引起:org.apache.kafka.connect.runtime.rest.errors.BadRequestException: 连接器配置 {"config"={, "database.user"="root",, "database.port"="3306",, "include.schema.changes"="true","database.server.name"="asgard","connector.class"="io.debezium.connector.mysql.MySqlConnector","database.history.kafka .topic"="dbhistory.demo" ,, "database.server.id"="42",, "name"="mysql-source-demo-customers",, "database.hostname"="localhost",, {=, "database.password"="dsm1234",, }=, "database.history.kafka.bootstrap.servers"="localhost:9092",, "table.whitelist"="demo.customers",} 包含无连接器类型 在 org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:287) 在 org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:192) 在 org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115) [2019-11-20 06:02:05,221] 信息卡夫卡连接停止(org.apache.kafka.connect.runtime.Connect:66) [2019-11-20 06:02:05,221] 信息停止 REST 服务器(org.apache.kafka.connect.runtime.rest.RestServer:241) [2019-11-20 06:02:05,224] 信息停止 http_8083@2a7686a7{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:341) [2019-11-20 06:02:05,225] INFO node0 停止清理(org.eclipse.jetty.server.session:167) [2019-11-20 06:02:05,226] INFO REST 服务器已停止(org.apache.kafka.connect.runtime.rest.RestServer:258) [2019-11-20 06:02:05,226] INFO Herder 停止 (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:98) [2019-11-20 06:02:05,226] INFO 工人停止(org.apache.kafka.connect.runtime.Worker:194) [2019-11-20 06:02:05,226] 信息停止 FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:66) [2019-11-20 06:02:05,226] INFO Worker 停止(org.apache.kafka.connect.runtime.Worker:215) [2019-11-20 06:02:05,227] INFO Herder 停止(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:115) [2019-11-20 06:02:05,227] INFO Kafka Connect 已停止(org.apache.kafka.connect.runtime.Connect:71)
这是我的test.config:
{
"name": "mysql-source-demo-customers",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "dsm1234",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.customers",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.demo" ,
"include.schema.changes": "true"
}
}
这是我的connect-standalone.properties:
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
plugin.path=/home/ec2-user/share/confluent-hub-components
错误日志显示“不包含连接器类型”。
我在 Stackoverflow 上发现了一个类似的问题并关注了它,但它对我不起作用或与我的案例无关。 (我又问了一个类似的问题,是关于plugin.path的)
【问题讨论】:
标签: apache-kafka