【问题标题】:Not able to use Kafka's JdbcSourceConnector to read data from Oracle DB to kafka topic无法使用 Kafka 的 JdbcSourceConnector 将数据从 Oracle DB 读取到 kafka 主题
【发布时间】:2019-06-18 05:33:28
【问题描述】:

我正在尝试使用 kafka-jdbc-connect API 编写一个独立的 java 程序,将数据从 oracle-table 流式传输到 kafka 主题。

使用的 API:我目前正在尝试使用 Kafka 连接器,确切地说是 JdbcSourceConnector 类。

约束:使用 Confluent Java API,而不是通过 CLI 或执行提供的 shell 脚本。

我所做的:创建一个 JdbcSourceConnector.java 类的实例,并通过提供 Properties 对象作为参数来调用该类的 start(Properties) 方法。该属性对象具有数据库连接属性、表白名单属性、主题前缀等。

启动线程后,我无法从“topic-prefix-tablename”主题中读取数据。我不确定如何将 Kafka Broker 详细信息传递给 JdbcSourceConnector。在 JdbcSourceConnector 启动线程上调用 start() 方法但不执行任何操作。 是否有一个简单的 java API 教程页面/示例代码可以参考,因为我看到的所有示例都使用 CLI/shell 脚本?

感谢任何帮助

代码:

    public static void main(String[] args) {

        Map<String, String> jdbcConnectorConfig = new HashMap<String, String>();
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "<DATABASE_URL>");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "<DATABASE_USER>");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, "<DATABASE_PASSWORD>");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG, "300000");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.BATCH_MAX_ROWS_CONFIG, "10");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.MODE_CONFIG, "timestamp");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "<TABLE_NAME>");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TIMESTAMP_COLUMN_NAME_CONFIG, "<TABLE_COLUMN_NAME>");
        jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-oracle-jdbc-");


        JdbcSourceConnector jdbcSourceConnector = new JdbcSourceConnector ();
        jdbcSourceConnector.start(jdbcConnectorConfig);

    }

【问题讨论】:

  • 请分享您目前制作的代码
  • 你为什么要从你自己的程序中运行它?为什么不按预期在 Kafka Connect 下运行它? confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector
  • 作为战术解决方案,我必须将数据从 oracle 加载到 cassandra 数据存储。甚至 confluent-platform 也可以作为我们公司的服务提供,并且与它相关联。我们不想花钱购买战术解决方案。
  • @Deepak 您无需为 Kafka Connect 支付任何许可。

标签: apache-kafka apache-kafka-connect


【解决方案1】:

假设您尝试在独立模式下进行。

在您的应用程序运行配置中,您的主类应该是“org.apache.kafka.connect.cli.ConnectStandalone”,并且您需要传递两个属性文件作为程序参数。

您还应该使用“org.apache.kafka.connect.source.SourceConnector”类扩展“your-custom-JdbcSourceConnector”类

主类: org.apache.kafka.connect.cli.ConnectStandalone

程序参数: .\path-to-config\connect-standalone.conf .\path-to-config\connetcor.properties

“connect-standalone.conf” 文件将包含所有 Kafka 代理详细信息。

// Example connect-standalone.conf
bootstrap.servers=<comma seperated brokers list here>

group.id=some_loca_group_id

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=connect.offset

offset.flush.interval.ms=100
offset.flush.timeout.ms=180000
buffer.memory=67108864
batch.size=128000
producers.acks=1

connector.properties”文件将包含创建和启动连接器所需的所有详细信息

// Example connector.properties
name=some-local-connector-name
connector.class=your-custom-JdbcSourceConnector    
tasks.max=3
topic=output-topic
fetchsize=10000

更多信息在这里:https://docs.confluent.io/current/connect/devguide.html#connector-example

【讨论】:

    猜你喜欢
    • 2021-09-12
    • 2020-09-18
    • 2021-11-20
    • 2023-03-18
    • 1970-01-01
    • 2021-06-11
    • 2020-07-17
    • 2018-01-09
    • 1970-01-01
    相关资源
    最近更新 更多