【问题标题】:How to connecting kafka to mysql?如何将kafka连接到mysql?
【发布时间】:2019-08-16 17:50:30
【问题描述】:

我正在尝试在 Windows 上将 kafka 与 mysql 连接起来。我没有使用融合。我的 kafka 版本是 2.12 我已经启动了 zookeeper、Kafka、生产者和消费者,这一切都很好。

我的 MySQL 版本是 8.0.15

我已经将这 3 个 jar 文件复制到 libs 文件夹中

mysql-connector-java-8.0.15.jar
mysql-connector-java-5.1.47.jar
mysql-connector-java-5.1.47-bin.jar

我的source-quickstart-mysql.properties文件代码是

name=test-source-mysql-jdbc-autoincrement        connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/databasename? 
user=rootname&password=password
mode=incrementing
incrementing.column.name=ID
topic.prefix=my-replicated-topic-table1

当我运行命令时

connect-standalone.bat ..\..\config\connect-standalone.properties  ..\..\config\source-quickstart-mysql.properties

我在控制台上收到此错误

[2019-03-26 16:16:39,524] 错误无法为 ....\config\source-quickstart-mysql.properties 创建作业 (org.apache.kafka.connect.cli.ConnectStandalone) [2019-03-26 16:16:39,524] 连接器错误后停止错误(org.apache.kafka.connect.cli.ConnectStandalone) java.util.concurrent.ExecutionException:org.apache.kafka.connect.errors.ConnectException:找不到任何 实现连接器且名称匹配的类 io.confluent.connect.jdbc.JdbcSourc eConnector,可用的连接器有:PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', 版本='2.1.0',编码 edVersion=2.1.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSource 连接器',版本='2.1.0',编码版本=2.1.0,类型=源,类型名称='源',位置='类路径'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, 名称='org.apache.kafka.co nnect.tools.MockConnector',版本='2.1.0',编码版本=2.1.0,类型=连接器,类型名称='连接器', location='classpath'},PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector',版本='2.1.0',encodedVersion=2.1.0,type=sink,typeName='sink', location='classpath'},PluginDesc{klass=class org.apache.kafka.connect.tool s.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', 版本='2.1.0',编码版本=2.1.0,类型=来源,类型名称='来源', location='classpath'},PluginDesc{klass=class o rg.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', 版本='2.1.0',编码版本=2.1.0,类型=来源,类型名称='来源', 位置='类 路径'},PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', 版本='2.1.0',编码版本=2.1.0,类型=源 , typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', 版本='2 .1.0',encodedVersion=2.1.0,type=source,typeName='source',location='classpath'} 在 org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79) 在 org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66) 在 org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110) 引起:org.apache.kafka.connect.errors.ConnectException:找不到任何 实现连接器且名称匹配的类 io.confluent.connect.jdbc.JdbcSourceConnector,可用的connecto rs 是:PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', 版本='2.1.0',编码版本=2.1.0,类型=接收器,ty peName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', 版本='2.1.0',e ncodedVersion=2.1.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', ve rsion='2.1.0',encodedVersion=2.1.0,type=connector,typeName='connector',location='classpath'},PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, 名称='org.apache.kafka.connec t.tools.MockSinkConnector',版本='2.1.0',encodedVersion=2.1.0,type=sink,typeName='sink', location='classpath'},PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='o rg.apache.kafka.connect.tools.MockSourceConnector',版本='2.1.0',encodedVersion=2.1.0,type=source,typeName='source', location='classpath'},PluginDesc{klass=class org.apache.kafka.connect.tools。 SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', 版本='2.1.0',编码版本=2.1.0,类型=来源,类型名称='来源', location='classpath'},PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', 版本='2.1.0',编码版本=2.1.0,类型=来源,类型名称='来源', 位置= 'classpath'},PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', 版本='2.1.0',编码版本=2.1.0,t 类型=来源,类型名称='来源',位置='类路径'} 在 org.apache.kafka.connect.runtime.isolation.Plugins.newConnector(Plugins.java:179) 在 org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:382) 在 org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:261) 在 org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:189) 在 org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)

请帮帮我。

我也尝试过这篇文章,但没有使用命令生成输出 bin/confluent 加载 jdbc-source -d jdbc-source.properties https://supergloo.com/kafka-connect/kafka-connect-mysql-example/

【问题讨论】:

标签: mysql apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

你的错误是

 org.apache.kafka.connect.errors.ConnectException: Failed to find any class that 
 implements Connector and which name matches io.confluent.connect.jdbc.JdbcSourceConnector

既然你说你没有使用 Confluent Platform 是有道理的,因为 kafka-connect-jdbc 不是 Apache Kafka 的一部分。您可以使用 Confluent Platform,从 source 构建连接器,或者在 http://hub.confluent.io 下载它。

【讨论】:

  • 我收到一个新错误。这次我的数据库位于另一个 Linux 节点中,假设 xxx.xx.xxx.xxx:3306 并且我正在从我的 Windows 机器运行 kafka connect 的独立命令。我的 server.properties 代码是 name=test-source-mysql-jdbc-autoincrement connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://xxx.xx.xxx .xxx:3306/databasename?user=username &password=password mode=incrementing incrementing.column.name=columnname topic.prefix=MYSQL-databasename-database-–
  • 错误:2019 年 3 月 28 日星期四 09:49:00 IST 警告:不建议在没有服务器身份验证的情况下建立 SSL 连接。根据 MySQL 5.5.45+、5.6.26+ 和 5.7.6+ 的要求,如果未设置显式选项,则默认情况下必须建立 SSL 连接。为了符合不使用 SSL 的现有应用程序,verifyServerCertificate 属性设置为“false”。您需要通过设置 useSSL=false 来显式禁用 SSL,或者设置 useSSL=true 并为服务器证书验证提供信任库。 java.lang.IllegalArgumentException:组数必须为正数。
【解决方案2】:

如果在 kafka lib 路径中添加 kafka-connect-jdbc-5.5.1.jar 并重启 kafka zookeeper 和服务器。您应该可以连接。

【讨论】:

    【解决方案3】:

    您可以在https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc 下载 Kafka Connect JDBC,它是免费的,并且可以在没有 Confluent 平台的情况下运行。解压后,更新 confluentinc-kafka-connect-jdbc-5.5.1.jar 的位置,以便在已安装的 Apache Kafka 的 config 目录下的 connect-standalone.properties 中找到键 plugin.path。再次运行脚本后,错误将消失。

    【讨论】:

      猜你喜欢
      • 2021-04-16
      • 1970-01-01
      • 2023-01-24
      • 2021-07-27
      • 2021-05-06
      • 2020-04-09
      • 2017-08-11
      • 2017-12-07
      • 1970-01-01
      相关资源
      最近更新 更多