【问题标题】:How to change the name of the topic generated by Kafka Connect Source Connector如何更改 Kafka Connect Source Connector 生成的主题名称
【发布时间】:2020-09-02 22:50:15
【问题描述】:

我有一个已经在运行的生产部署了 Kafka-Cluster,并且主题为“existing-topic”。我正在使用来自 Debezium 的 MongoDB-Source-Connector。

我想要的只是将 CDC 事件直接推送到主题“existing-topic”,以便已经在收听该主题的消费者将处理它。

我没有找到任何资源来做这件事,但是有人提到该主题是以以下格式创建的 -

"如果你的 mongodb.name 参数是 A,数据库名是 B,集合名是 C,那么数据库 A 和集合 C 的数据将被加载到主题 A.B.C 下"

我可以将主题更改为“existing-topic”并将事件推送到它吗?

【问题讨论】:

    标签: apache-kafka apache-kafka-connect mongodb-kafka-connector


    【解决方案1】:

    根据documentation

    Kafka 主题的名称 always 采用以下形式 logicalName.databaseName.collectionName,其中logicalNamelogical namemongodb.name 指定的连接器 配置属性,databaseName 是数据库的名称,其中 操作发生,collectionName 是 MongoDB 的名称 受影响文档所在的集合。


    这意味着如果您的连接器的逻辑名称是 myConnector 并且您的数据库 myDatabase 有两个集合 usersorders

    {
      "name": "myConnector",  
      "config": {
        "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
        "mongodb.hosts": "mongo-db-host:27017", 
        "mongodb.name": "myDatabase", 
        "collection.whitelist": "myDatabase[.]*", 
      }
    }
    

    然后 Kafka Connect 将使用名称填充两个主题:

    • myConnector.myDatabase.users
    • myConnector.myDatabase.orders

    现在,如果您仍想更改目标主题的名称,可以使用 Kafka Connect Single Message Transforms (SMT)。更准确地说,ExtractTopic 应该可以帮助您。请注意,尽管此 SMT 可以帮助您从消息的键或值中提取主题名称,因此您需要以某种方式在有效负载中包含所需的主题名称。

    例如,以下 SMT 将提取字段 myField 的值并将其用作记录的主题:

     transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
     transforms.ValueFieldExample.field=myField
    

    【讨论】:

      【解决方案2】:

      我在使用 JDBC 源连接器时遇到了同样的问题,但找到了不同的解决方案:

      RegexRouter 单消息转换与dropPrefix 一起使用,您可以覆盖整个主题名称:

      "transforms":"dropPrefix",
      "transforms.dropPrefix.regex":"A.B.C",                 // whole created topic name
      "transforms.dropPrefix.replacement":"existing-topic"   // whole exisiting topic name
      

      它适用于正则表达式,因此如果您使用多个表/集合并且您创建的主题名称不是恒定的,您应该能够使其动态化。

      这有点 hacky,因为从技术上讲,我要删除整个主题名称,然后添加一个新主题名称 - 无论如何,这对我来说并不是最好的解决方案。

      【讨论】:

      • 如果我在 JDBC 接收器连接器中使用此转换并且我正在使用此转换,如果我想将此转换的输出作为我的目标表,我应该为参数“table.name.format”赋予什么值名称
      • 尚未尝试转换,我需要在 mongo 源连接器中应用它。将检查它如何适合它。
      猜你喜欢
      • 2021-08-07
      • 2021-03-25
      • 2021-07-26
      • 1970-01-01
      • 2021-07-26
      • 2022-12-04
      • 2020-07-15
      • 1970-01-01
      • 2020-12-29
      相关资源
      最近更新 更多