【问题标题】:How to get the table-name and database-name in the CDC event received from debezium kafka connect如何在从 debezium kafka connect 收到的 CDC 事件中获取表名和数据库名
【发布时间】:2019-09-20 11:06:24
【问题描述】:

设置:我在 MS SQL Server 上启用了 CDC,并且使用 debezium kafka connect(source) 将 CDC 事件馈送到 Kafka。此外,多个表 CDC 事件被路由到 Kafka 中的单个主题。

问题:由于我在kafka topic里有多个表数据,所以想在CDC数据中有表名和数据库名。

我在 MySQL CDC 中获取表名和数据库名,但在 MS SQL CDC 中没有。

下面是 SQL Server 的 Debezium 源连接器

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
  "name": "cdc-user_profile-connector",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "database.hostname": "<<hostname>>",
    "database.port": "<<port>>",
    "database.user": "<<username>>",
    "database.password": "<<password>>",
    "database.server.name": "test",
    "database.dbname": "testDb",
    "table.whitelist": "schema01.table1,schema01.table2",
    "database.history.kafka.bootstrap.servers": "broker:9092",
    "database.history.kafka.topic": "digital.user_profile.schema.audit",
    "database.history.store.only.monitored.tables.ddl": true,
    "include.schema.changes": false,
    "event.deserialization.failure.handling.mode": "fail",
    "snapshot.mode": "initial_schema_only",
    "snapshot.locking.mode": "none",
    "transforms":"addStaticField,topicRoute",
    "transforms.addStaticField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addStaticField.static.field":"source_system",
    "transforms.addStaticField.static.value":"source_system_1",
    "transforms.topicRoute.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.topicRoute.regex":"(.*)",
    "transforms.topicRoute.replacement":"digital.user_profile",
    "errors.tolerance": "none",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "errors.retry.delay.max.ms": 60000,
    "errors.retry.timeout": 300000
  }
}'

我得到以下输出(演示数据)

{
  "before": {
    "profile_id": 147,
    "email_address": "test@gmail.com"
  },
  "after": {
    "profile_id": 147,
    "email_address": "test_modified@gmail.com"
  },
  "source": {
    "version": "0.9.4.Final",
    "connector": "sqlserver",
    "name": "test",
    "ts_ms": 1556723528917,
    "change_lsn": "0007cbe5:0000b98c:0002",
    "commit_lsn": "0007cbe5:0000b98c:0003",
    "snapshot": false
  },
  "op": "u",
  "ts_ms": 1556748731417,
  "source_system": "source_system_1"
}

我的要求是得到如下

{
  "before": {
    "profile_id": 147,
    "email_address": "test@gmail.com"
  },
  "after": {
    "profile_id": 147,
    "email_address": "test_modified@gmail.com"
  },
  "source": {
    "version": "0.9.4.Final",
    "connector": "sqlserver",
    "name": "test",
    "ts_ms": 1556723528917,
    "change_lsn": "0007cbe5:0000b98c:0002",
    "commit_lsn": "0007cbe5:0000b98c:0003",
    "snapshot": false,
    "db": "testDb",
    "table": "table1/table2"
  },
  "op": "u",
  "ts_ms": 1556748731417,
  "source_system": "source_system_1"
}

【问题讨论】:

  • 您可以为每个表创建一个连接器,然后添加static.field 以添加到表中。
  • 感谢@cricket_007,这将是在推出issues.jboss.org/browse/DBZ-875 之前最好的解决方法。

标签: sql-server apache-kafka apache-kafka-connect cdc debezium


【解决方案1】:

这是https://issues.jboss.org/browse/DBZ-875问题的一部分

【讨论】:

  • 谢谢@jiri,这个功能什么时候推出。
  • 在下一个版本中 - 我估计在 3 周内。
【解决方案2】:

Debezium Kafka-Connect 通常将每个表中的数据放在单独的主题中,主题名称的格式为 hostname.database.table。我们一般使用主题名来区分源表和数据库名。

如果您要将所有表中的数据手动放入一个主题中,那么您可能还必须手动添加表和数据库名称。

【讨论】:

  • 我如何在 CDC 事件中手动添加它?。向 kafka 主题添加多个表数据的原因是因为这是一个企业数据管道,可能有 100 个表。如果我们继续增加这样的主题,那么 kafka 集群的性能将会下降。此外,kafka connect 通过设置这两个属性 CONNECT_KEY_CONVERTER_KEY_SUBJECT_NAME_STRATEGY: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY: "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" 来提供此功能。
猜你喜欢
  • 2020-07-09
  • 2023-01-26
  • 2019-05-13
  • 2016-08-14
  • 2018-11-23
  • 2020-11-28
  • 2019-10-23
  • 2021-03-07
  • 2020-10-08
相关资源
最近更新 更多