【问题标题】:Kafka JDBC Source connector: create topics from column valuesKafka JDBC Source 连接器:从列值创建主题
【发布时间】:2020-03-31 20:40:44
【问题描述】:

我有一个微服务,它使用 OracleDB 在EVENT_STORE 表中发布系统更改。表EVENT_STORE 包含一个列TYPE,其中包含事件类型的名称。

JDBC Source Kafka Connect 是否有可能获取 EVENT_STORE 表更改并使用 KAFKA-TOPIC 中的列 TYPE 的值发布它们?

这是我的源卡夫卡连接器配置:

{
  "name": "kafka-connector-source-ms-name",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:oracle:thin:@localhost:1521:xe",
    "connection.user": "squeme-name",
    "connection.password": "password",
    "topic.prefix": "",
    "table.whitelist": "EVENT_STORE",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "CREATE_AT",
    "incrementing.column.name": "ID",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "config.action.reload": "restart",
    "errors.retry.timeout": "0",
    "errors.retry.delay.max.ms": "60000",
    "errors.tolerance": "none",
    "errors.log.enable": "false",
    "errors.log.include.messages": "false",
    "connection.attempts": "3",
    "connection.backoff.ms": "10000",
    "numeric.precision.mapping": "false",
    "validate.non.null": "true",
    "quote.sql.identifiers": "ALWAYS",
    "table.types": "TABLE",
    "poll.interval.ms": "5000",
    "batch.max.rows": "100",
    "table.poll.interval.ms": "60000",
    "timestamp.delay.interval.ms": "0",
    "db.timezone": "UTC"
  }
}

【问题讨论】:

    标签: jdbc apache-kafka apache-kafka-connect


    【解决方案1】:

    您可以尝试使用ExtractTopic 转换从字段中提取主题名称

    将以下属性添加到 JSON

    transforms=ValueFieldExample
    transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
    transforms.ValueFieldExample.field=TYPE
    

    【讨论】:

    • 它是否也适用于 spoolDirSource 连接器?它从配置中获取 topic 字段,因为 topic 值在融合 UI 中是必需的。它不承认转换。有什么帮助吗?
    • @Gibbs 转换适用于所有连接器。此转换从值中获取主题,并应覆盖源连接器设置为写入的主题。不过,我建议为您的特定问题创建一个新帖子
    • 当然。我去做。我会创建一个。
    猜你喜欢
    • 2019-11-08
    • 2021-09-01
    • 2019-11-24
    • 2018-05-01
    • 2018-09-14
    • 2020-08-08
    • 1970-01-01
    • 2017-10-12
    • 1970-01-01
    相关资源
    最近更新 更多