【发布时间】:2019-09-28 05:18:53
【问题描述】:
我有一个数据库 (Mariadb) 关系,其中有一列“修改”为“bigint(10)”,表示时间戳,我相信 unix 时间格式。当我尝试使用“时间戳”或“时间戳+递增”模式运行 kafka 源连接器时,不会将任何事件推送到主题中。如果我只运行增量,新条目将被推送到主题。有人可以提示我在哪里错误地配置了连接器吗?还是连接器无法识别 unix 时间格式的时间戳?
我尝试运行具有以下属性的连接器(仅基于时间戳的检索):
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name":"only_ts",
"config": {
"numeric.mapping": "best_fit",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mariadb/moodle",
"connection.user": "user",
"connection.password": "",
"topic.prefix": "only_ts_",
"mode": "timestamp",
"timestamp.column.name":"modified",
"table.whitelist":"mdl_forum_posts",
"poll.intervals.ms": 10000
}
}'
每当我创建条目或更新条目时,我希望看到来自“mdl_forum_posts”的条目被推送到 kafka 主题“only_ts_mdl_forum_posts”中。然而,使用这个连接器,什么也没有发生。 如果我只使用“递增”模式,这可以正常工作并且符合预期。但要获得 DB UPDATES,我还需要添加模式时间戳。
“描述 mdl_forum_posts”的输出
+---------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------+----------------+
| id | bigint(10) | NO | PRI | NULL | auto_increment |
| discussion | bigint(10) | NO | MUL | 0 | |
| parent | bigint(10) | NO | MUL | 0 | |
| userid | bigint(10) | NO | MUL | 0 | |
| created | bigint(10) | NO | MUL | 0 | |
| modified | bigint(10) | NO | | 0 | |
| mailed | tinyint(2) | NO | MUL | 0 | |
| subject | varchar(255) | NO | | | |
| message | longtext | NO | | NULL | |
| messageformat | tinyint(2) | NO | | 0 | |
| messagetrust | tinyint(2) | NO | | 0 | |
| attachment | varchar(100) | NO | | | |
| totalscore | smallint(4) | NO | | 0 | |
| mailnow | bigint(10) | NO | | 0 | |
| deleted | tinyint(1) | NO | | 0 | |
+---------------+--------------+------+-----+---------+----------------+
“show create table moodle.mdl_forum_posts;”的输出:
| mdl_forum_posts | CREATE TABLE mdl_forum_posts (
id bigint(10) NOT NULL AUTO_INCREMENT,
discussion bigint(10) NOT NULL DEFAULT '0',
parent bigint(10) NOT NULL DEFAULT '0',
userid bigint(10) NOT NULL DEFAULT '0',
created bigint(10) NOT NULL DEFAULT '0',
modified bigint(10) NOT NULL DEFAULT '0',
mailed tinyint(2) NOT NULL DEFAULT '0',
subject varchar(255) NOT NULL DEFAULT '',
message longtext NOT NULL,
messageformat tinyint(2) NOT NULL DEFAULT '0',
messagetrust tinyint(2) NOT NULL DEFAULT '0',
attachment varchar(100) NOT NULL DEFAULT '',
totalscore smallint(4) NOT NULL DEFAULT '0',
mailnow bigint(10) NOT NULL DEFAULT '0',
deleted tinyint(1) NOT NULL DEFAULT '0',
PRIMARY KEY (id),
KEY mdl_forupost_use_ix (userid),
KEY mdl_forupost_cre_ix (created),
KEY mdl_forupost_mai_ix (mailed),
KEY mdl_forupost_dis_ix (discussion),
KEY mdl_forupost_par_ix (parent)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COMMENT='All posts are stored in this table' |
“已修改”列中的示例条目是:
select modified from mdl_forum_posts;
1557487199
这是一个unix时间的时间戳,如下所示:
select from_unixtime(modified) from mdl_forum_posts;
2019-05-10 11:19:59
关于相关连接器的相关日志(仅时间戳)似乎显示了一些查询?
kafka-connect_1 | [2019-05-10 11:48:47,434] DEBUG TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} prepared SQL query: SELECT * FROM `moodle`.`mdl_forum_posts` WHERE `moodle`.`mdl_forum_posts`.`modified` > ? AND `moodle`.`mdl_forum_posts`.`modified` < ? ORDER BY `moodle`.`mdl_forum_posts`.`modified` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
kafka-connect_1 | [2019-05-10 11:48:47,435] DEBUG Resetting querier TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} (io.confluent.connect.jdbc.source.JdbcSourceTask)
【问题讨论】:
-
如果你使用 just
timestamp是否有效? Kafka Connect 工作器日志中是否出现错误? -
好点,并且:不,它不适用于时间戳。如果我并行启动“仅递增”连接器,则该连接器有效。据我所知,日志没有显示任何异常。连接器状态为“正在运行”。谢谢!
-
你可以分享你的配置,当你尝试只用
timestamp时?你的表 DDL 呢? -
嗨,我在发送时添加了连接器配置以及 DDL。感谢您的帮助!
-
@toscio:找到解决方案有运气吗?我个人得到零星的结果。
标签: jdbc apache-kafka apache-kafka-connect