【问题标题】:Kafka JDBC Connect (Source and Sink) and InformixKafka JDBC 连接(源和接收器)和 Informix
【发布时间】:2021-05-07 19:16:05
【问题描述】:

有什么方法可以将“:”转换为“.”在由 jdbc 连接器构造的查询中?

"SELECT * FROM 数据库:用户:表 WHERE 数据库:用户:表"

连接器配置:

"name": "jdbc_source_connector",
"config": {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url" : "jdbc:informix-sqli://IP:PORT/databasa:informixserver=oninit;user=user;password=password",
    "topic.prefix" : "table-",
    "poll.interval.ms" : "100000",
    "mode" : "incrementing",
    "table.whitelist" : "table",
    "query.suffix" : ";",
    "incrementing.column.name" : "lp"

错误:

[2021-02-03 14:03:02,809] INFO Begin using SQL query: SELECT * FROM  database  :  user  :  table  WHERE  database  :  user  :  table : lp  > ? ORDER B
Y  database  :  user  :  table : lp  ASC ; (io.confluent.connect.jdbc.source.TableQuerier:164)
[2021-02-03 14:03:02,853] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table="database "." user "." table", query='null', top
icPrefix='database-', incrementingColumn='lp', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:404)
java.sql.SQLSyntaxErrorException: A syntax error has occurred.

连接器会话:

informix database:/usr/informix$ onstat -g ses 544271

IBM Informix Dynamic Server Version 12.10.FC13 -- On-Line -- Up 60 days 18:26:06 -- 4985440 Kbytes

session           effective                                               #RSAM    total      used       dynamic
id       user     user      tty      pid      hostname                    threads  memory     memory     explain
544271   user      -         -        1266352  kafkahost                  1        172032     102672     off

Program :
Thread[id:175, name:task-thread-jdbc_source_connector_database, path:/app/kafka_2.13-2.7.0/plugins/kafka-connect-jdbc-10.0.1/lib/jdbc-4.50.4.1.jar]

tid      name     rstcb            flags    curstk   status
583042   sqlexec  7000000437451a8  Y--P---  6224     cond wait  netnorm   -

Memory pools    count 2
name         class addr              totalsize  freesize   #allocfrag #freefrag
544271       V     70000004f6c7040  167936     68592      113        35
544271*O0    V     700000065ad0040  4096       768        1          1

name           free       used           name           free       used
overhead       0          6656           scb            0          144
opentable      0          11352          filetable      0          1040
log            0          16536          temprec        0          22688
keys           0          816            gentcb         0          1592
ostcb          0          3472           sqscb          0          25128
hashfiletab    0          552            osenv          0          2056
sqtcb          0          9336           fragman        0          640
sapi           0          144            udr            0          520

sqscb info
scb              sqscb            optofc   pdqpriority optcompind  directives
7000000343d3360  700000039194028  0        0           0           1

Sess       SQL            Current            Iso Lock       SQL  ISAM F.E.
Id         Stmt type      Database           Lvl Mode       ERR  ERR  Vers  Explain
544271     -              database                CR  Not Wait   0    0    9.28  Off

Last parsed SQL statement :
  SELECT * FROM  database  :  user  :  table  WHERE  database  :  user  :  table :
    lp  > ? ORDER BY  database  :  user  :  table : lp  ASC

【问题讨论】:

  • 为什么要使用非本地表示法编写查询?这真的很正常吗? Informix 使用[database[@server]:][owner.]tablename 本地识别表(所以database@server:owner.tablename 可以,database@server:tablenamedatabase:owner.tablenamedatabase:tablenameowner.tablename 或普通tablename 也是如此)。识别列时事情可能会变得棘手,但符号基本上使用点将表名与列名分开。我对 Kafka 一无所知(对 JDBC 也知之甚少),但这感觉有点像“医生,医生,好痛……”。
  • 问题是,这个查询是由连接器自动生成的,连接器插入“:”而不是“.”
  • 您可以使用 Kafka 中的query 选项来指定 Informix 所需的语法吗? docs.confluent.io/3.0.0/connect/connect-jdbc/docs/…
  • 是的。查询选项工作正常
  • 谢谢布赖恩·休斯!

标签: jdbc apache-kafka apache-kafka-connect informix


【解决方案1】:

因此,唯一的解决方法是将查询选项添加到连接器配置中

修改后的配置:

"name": "jdbc_source_connector",
"config": {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url" : "jdbc:informix-sqli://IP:PORT/databasa:informixserver=oninit;user=user;password=password",
    "topic.prefix" : "table-tablename",
    "poll.interval.ms" : "100000",
    "mode" : "incrementing",
    "query" : "select * from table ",
    "incrementing.column.name" : "lp"

【讨论】:

    猜你喜欢
    • 2021-09-09
    • 2017-10-12
    • 2019-06-17
    • 2020-01-11
    • 2021-08-30
    • 2019-11-15
    • 2020-01-15
    • 2019-11-17
    • 2018-05-01
    相关资源
    最近更新 更多