【问题标题】:Creating and using a custom kafka connect configuration provider创建和使用自定义 kafka 连接配置提供程序
【发布时间】:2020-01-19 14:04:52
【问题描述】:

我已经在分布式模式下安装并测试了 kafka connect,它现在可以工作,它连接到配置的接收器并从配置的源读取。 在这种情况下,我开始改进我的安装。我认为需要立即关注的一个领域是创建连接器,唯一可用的方法是通过 REST 调用,这意味着我需要通过线路发送我的信息,不受保护。 为了确保这一点,kafka 引入了新的 ConfigProvider,见于here。 这很有帮助,因为它允许在服务器中设置属性,然后在 rest 调用中引用它们,如下所示:

{
.
.
"property":"${file:/path/to/file:nameOfThePropertyInFile}"
.
.
}

这真的很好用,只需在服务器上添加属性文件并在distributed.properties文件中添加以下配置:

config.providers=file   # multiple comma-separated provider types can be specified here
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

虽然此解决方案有效,但它确实无助于缓解我对安全性的担忧,因为信息现在从通过网络发送到现在位于存储库中,并且每个人都可以看到文本。 kafka 团队预见到了这个问题,并允许客户端生成自己的配置提供程序来实现接口 ConfigProvider。 我创建了自己的实现并打包在一个 jar 中,并为其命名:

META-INF/services/org.apache.kafka.common.config.ConfigProvider

并在分发的文件中添加了以下条目:

config.providers=cust
config.providers.cust.class=com.somename.configproviders.CustConfigProvider

但是我从连接中得到一个错误,说明一个实现 ConfigProvider 的类,名称为:

com.somename.configproviders.CustConfigProvider

找不到。 我现在不知所措,因为他们网站上的文档没有明确说明如何很好地配置自定义配置提供程序。

有没有人研究过类似的问题并且可以提供一些见解?任何帮助将不胜感激。

【问题讨论】:

标签: apache-kafka apache-kafka-connect


【解决方案1】:

我最近刚刚通过这些设置了一个自定义 ConfigProvider。官方文档含糊不清。

我已经创建了自己的实现并打包在一个 jar 中,并为其指定最终名称: META-INF/services/org.apache.kafka.common.config.ConfigProvider

你可以随意命名jar的最终名称,但需要打包成带有.jar后缀的jar格式。

这里是完整的一步一步。假设您的自定义 ConfigProvider 完全限定名称是 com.my.CustomConfigProvider.MyClass。 1.在目录下创建一个文件:META-INF/services/org.apache.kafka.common.config.ConfigProvider。文件内容是全限定类名: com.my.CustomConfigProvider.MyClass

  1. 包含你的源代码,以及上面的 META-INF 文件夹以生成 Jar 包。如果您使用的是 Maven,文件结构类似于 this

  2. 将您的最终 Jar 文件(例如 custom-config-provider-1.0.jar)放在 Kafka worker 插件文件夹下。默认为 /usr/share/java。 Kafka worker 配置文件中的 PLUGIN_PATH。

  3. 也将所有依赖 jar 文件上传到 PLUGIN_PATH。使用 Jar 文件中的 META-INFO/MANIFEST.MF 文件来配置代码将使用的依赖 jar 的“ClassPath”。

  4. 在 kafka worker 配置文件中,创建两个附加属性:

CONNECT_CONFIG_PROVIDERS: 'mycustom', // Alias name of your ConfigProvider
CONNECT_CONFIG_PROVIDERS_MYCUSTOM_CLASS:'com.my.CustomConfigProvider.MyClass',
  1. 重启工作器

  2. 通过将 POST 卷曲到 Kafka Restful API 来更新您的连接器配置文件。在连接器配置文件中,您可以使用以下语法引用从ConfigProvider:get(path, keys) 返回的ConfigData 内的值:

database.password=${mycustom:/path/pass/to/get/method:password}

ConfigData 是一个包含 {password: 123} 的 HashMap

  1. 如果您仍然看到 ClassNotFound 异常,可能是您的 ClassPath 设置不正确。

注意: • 如果您使用AWS ECS/EC2,您需要通过设置环境变量来设置工作器配置文件。 • worker 配置和连接器配置文件不同。

【讨论】:

  • @Carlos 如何在 REST api 调用中使用默认 ConfigProvider 读取的变量?我的新文件 secret.properties 定义了这个: mongodbConnectionUri =mongodb://username:pwd@localhost:27017,... POST localhost:8083/connectors body: { "name":"mongo-connector", "config": { " connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": } } 在我的 connect-distributed.propertier 中添加:connection.uri= ${file:/opt/secret.properties:mongodbConnectionUri} 没有它我无法创建连接器。
  • 问题是 Windows 中的绝对路径。用冒号解析路径失败。所以我给出了相对路径,它工作正常。我想知道,如何在这里传递 windows 绝对路径,我尝试转义冒号,但它没有用。 "db.username":"${file:C:\\Users\\user\\config\\myfile.properties:dbusername}" 我必须在上面更改为:"db.username":"${file:config \\myfile.properties:dbusername}" 我正在从目录启动 Kafka-Connect:C:\\Users\user
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2022-08-04
  • 1970-01-01
  • 1970-01-01
  • 2022-08-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多