【问题标题】:kafka jdbc source connector: time stamp mode is not working for sqlite3kafka jdbc 源连接器:时间戳模式不适用于 sqlite3
【发布时间】:2019-10-31 23:05:29
【问题描述】:

我试图建立一个带有时间戳列的表的数据库。我正在尝试实现时间戳模式来捕获数据库中的增量更改。

但是 kafka-connect-jdbc 没有从表中读取任何数据。这是我所做的。

创建了一个表。

sqlite> CREATE TABLE test_timestamp(id integer primary key not null,
   ...>                   payment_type text not null,
   ...>                   Timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
   ...>                   user_id int not null);
sqlite> INSERT INTO test_timestamp (ID, PAYMENT_TYPE, USER_ID) VALUES (3,'FOO',1);
sqlite> select * from test_timestamp;
3|FOO|2019-06-18 05:31:22|1

我的jdbc-source连接器配置如下:

$ curl -s "http://localhost:8083/connectors/jdbc-source/config"|jq '.'
{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "timestamp",
  "timestamp.column.name": "timestamp",
  "topic.prefix": "testdb-",
  "validate.non.null": "false",
  "tasks.max": "1",
  "name": "jdbc-source",
  "connection.url": "jdbc:sqlite:/tmp/test.db"
}

jdbc-source-connector 成功加载并创建主题

$ kafka-topics --list --bootstrap-server localhost:9092
..
testdb-test_timestamp

但主题中没有数据。

有什么帮助吗?

提前致谢。

【问题讨论】:

  • 您可以尝试在配置中包含"query": "select * from test_timestamp" 吗?
  • 是的。我已经在配置文件中添加了这个,但没有运气。
  • 看起来这可以帮助你stackoverflow.com/questions/54518763/…你的时间戳列名称是“时间戳”而不是“时间戳”,尝试改变它。

标签: sqlite jdbc apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

您遇到了一个已知问题,详情请点击此处:https://github.com/confluentinc/kafka-connect-jdbc/issues/219

重现步骤:

  1. 创建数据库:

    $ echo 'DROP TABLE test_timestamp; CREATE TABLE test_timestamp(id integer primary key not null,
                     payment_type text not null,
                     Timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                     user_id int not null);
    INSERT INTO test_timestamp (ID, PAYMENT_TYPE, USER_ID) VALUES (3,\'FOO\',1);
    select * from test_timestamp;' | sqlite3 /tmp/test.db
    3|FOO|2019-07-03 08:28:43|1
    
  2. 创建连接器

    curl -X PUT "http://localhost:8083/connectors/jdbc-source/config" -H  "Content-Type:application/json"  -d '{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "mode": "timestamp",
    "timestamp.column.name": "timestamp",
    "topic.prefix": "testdb-",
    "validate.non.null": "false",
    "tasks.max": "1",
    "name": "jdbc-source",
    "connection.url": "jdbc:sqlite:/tmp/test.db"
    }'
    
  3. 检查连接器状态

    $ curl -s "http://localhost:8083/connectors"| \
    jq '.[]'| \
    xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \
    jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \
    column -s : -t| sed 's/\"//g'| sort
    jdbc-source  |  RUNNING  |  RUNNING
    
  4. 检查 Kafka Connect 工作日志,观察错误解析日期,每 issue 219

    [2019-07-03 10:40:58,260] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table="test_timestamp", query='null', topicPrefix='testdb-', incrementingColumn='', timestampColumns=[timestamp]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:332)
    java.sql.SQLException: Error parsing time stamp
        at org.sqlite.jdbc3.JDBC3ResultSet.getTimestamp(JDBC3ResultSet.java:576)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.currentTimeOnDB(GenericDatabaseDialect.java:484)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.endTimetampValue(TimestampIncrementingTableQuerier.java:203)
        at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.setQueryParametersTimestamp(TimestampIncrementingCriteria.java:164)
        at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.setQueryParameters(TimestampIncrementingCriteria.java:126)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:176)
        at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:92)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:60)
        at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:310)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.text.ParseException: Unparseable date: "2019-07-03 08:40:58" does not match (\p{Nd}++)\Q-\E(\p{Nd}++)\Q-\E(\p{Nd}++)\Q \E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q.\E(\p{Nd}++)
        at org.sqlite.date.FastDateParser.parse(FastDateParser.java:299)
        at org.sqlite.date.FastDateFormat.parse(FastDateFormat.java:490)
        at org.sqlite.jdbc3.JDBC3ResultSet.getTimestamp(JDBC3ResultSet.java:573)
        ... 17 more
    

【讨论】:

    【解决方案2】:

    偶然发现了类似的问题。就我而言,即使是主题也没有创建。发现连接工作人员的时区必须与数据库的时区相同。根据list(在 SHORT_IDS 部分下)正确设置连接工作者的属性文件中的 db.timezone 属性使其工作:

    db.timezone=Asia/Kolkata
    

    【讨论】:

      猜你喜欢
      • 2019-12-17
      • 2018-09-01
      • 2019-09-28
      • 2017-10-12
      • 2019-11-17
      • 1970-01-01
      • 2021-07-10
      • 2021-01-28
      • 1970-01-01
      相关资源
      最近更新 更多