【发布时间】:2019-06-18 05:33:28
【问题描述】:
我正在尝试使用 kafka-jdbc-connect API 编写一个独立的 java 程序,将数据从 oracle-table 流式传输到 kafka 主题。
使用的 API:我目前正在尝试使用 Kafka 连接器,确切地说是 JdbcSourceConnector 类。
约束:使用 Confluent Java API,而不是通过 CLI 或执行提供的 shell 脚本。
我所做的:创建一个 JdbcSourceConnector.java 类的实例,并通过提供 Properties 对象作为参数来调用该类的 start(Properties) 方法。该属性对象具有数据库连接属性、表白名单属性、主题前缀等。
启动线程后,我无法从“topic-prefix-tablename”主题中读取数据。我不确定如何将 Kafka Broker 详细信息传递给 JdbcSourceConnector。在 JdbcSourceConnector 启动线程上调用 start() 方法但不执行任何操作。 是否有一个简单的 java API 教程页面/示例代码可以参考,因为我看到的所有示例都使用 CLI/shell 脚本?
感谢任何帮助
代码:
public static void main(String[] args) {
Map<String, String> jdbcConnectorConfig = new HashMap<String, String>();
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "<DATABASE_URL>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "<DATABASE_USER>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, "<DATABASE_PASSWORD>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG, "300000");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.BATCH_MAX_ROWS_CONFIG, "10");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.MODE_CONFIG, "timestamp");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "<TABLE_NAME>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TIMESTAMP_COLUMN_NAME_CONFIG, "<TABLE_COLUMN_NAME>");
jdbcConnectorConfig.put(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG, "test-oracle-jdbc-");
JdbcSourceConnector jdbcSourceConnector = new JdbcSourceConnector ();
jdbcSourceConnector.start(jdbcConnectorConfig);
}
【问题讨论】:
-
请分享您目前制作的代码
-
你为什么要从你自己的程序中运行它?为什么不按预期在 Kafka Connect 下运行它? confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector
-
作为战术解决方案,我必须将数据从 oracle 加载到 cassandra 数据存储。甚至 confluent-platform 也可以作为我们公司的服务提供,并且与它相关联。我们不想花钱购买战术解决方案。
-
@Deepak 您无需为 Kafka Connect 支付任何许可。
标签: apache-kafka apache-kafka-connect