【问题标题】:Kafka connect many to many tables in MSSQLKafka 连接 MSSQL 中的多对多表
【发布时间】:2019-02-14 00:40:25
【问题描述】:

我目前正在研究 Kafka Connect,以将我们的一些数据库流式传输到数据湖。为了测试 Kafka Connect,我设置了一个包含我们项目数据库之一的数据库。到目前为止一切都很好。

下一步我使用以下属性配置了 Kafka Connect:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "timestamp.column.name": "updated_at,created_at",
  "incrementing.column.name": "id",
  "dialect.name": "SqlServerDatabaseDialect",
  "validate.non.null": "false",
  "tasks.max": "1",
  "mode": "timestamp+incrementing",
  "topic.prefix": "mssql-jdbc-",
  "poll.interval.ms": "10000",
}

虽然这适用于我获得 ID 和 created_at / updated_at 字段的大多数表,但它不适用于我解决了多对多关系的表和一个复合表钥匙。请注意,我将通用 JDBC 配置与 Microsoft 的 JDBC 驱动程序一起使用。

有没有办法针对这些特殊情况配置 Kafka Connect?

【问题讨论】:

  • 您是否尝试过使用query 配置选项来编写连接,例如?否则,文档中的 Kafka 模式将使用 Kafka Streams 将您的“查找表”主题与“数据表”主题结合起来
  • @cricket_007 我不太明白。我的印象是,当您想要跟踪特定查询时,可以使用查询参数。就我而言,我想跟踪完整的数据库。可能是我错了。 :-)
  • 递增时间戳选项也是“特定查询”,不,我不确定我是否理解您的担忧

标签: sql-server apache-kafka apache-kafka-connect mssql-jdbc


【解决方案1】:

您可能需要创建多个连接器,而不是一个连接器来拉取所有表。如果您想使用不同的方法来获取数据或不同的 ID/时间戳列,就会出现这种情况。 正如@cricket_007 所说,您可以使用query 选项来拉回查询的结果——这可能是一个SELECT,表示您的多表连接。即使从单个表对象中提取数据,JDBC 连接器本身也只是从给定表中发出SELECT *,并使用WHERE 谓词来限制基于递增的 ID/时间戳选择的行。

另一种方法是使用基于日志的变更数据捕获 (CDC),并将所有变更直接从数据库流式传输到 Kafka。

无论您使用 JDBC 还是基于日志的 CDC,您都可以使用流处理来解析 Kafka 本身中的连接。这方面的一个例子是 Kafka Streams 或 KSQL。我写过关于后者的lot here

您可能还会发现 this article 详细描述了您将数据库与 Kafka 集成的选项。

免责声明:我为开源 KSQL 项目背后的公司 Confluent 工作。

【讨论】:

  • 在我们的例子中,自动增量标识列被命名为 _id。如果我正确阅读了您的第一条语句,这是否意味着我们需要为每个表使用不同的连接器,因为每个表的递增列名不同?
  • @suhas 是的。或使用基于日志的 CDC,例如debezium.io/documentation/reference/connectors/sqlserver.html
  • 其中一个数据库是 Galera 集群中的 MariaDB,它使用 InnoDB 引擎而不是 MySQL 引擎。无论如何,Debezium 不支持 MariaDB,所以我正在探索 JDBC 选项
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-12-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多