【问题标题】:Loading data into oracle table using Kafka jdbc sink使用 Kafka jdbc sink 将数据加载到 oracle 表中
【发布时间】:2018-02-20 05:33:10
【问题描述】:

我正在尝试使用 JDBC 接收器连接器将数据从 Kafka 加载到 Oracle,以复制 confluent 网站中提到的示例:

https://docs.confluent.io/current/connect/connect-jdbc/docs/sink_connector.html

        name=jdbc-sink
        connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
        tasks.max=1

        # The topics to consume from - required for sink connectors like this one
        topics=orders

        # Configuration specific to the JDBC sink connector.
        # We want to connect to a SQLite database stored in the file test.db and 
        auto-create tables.
        #connection.url=jdbc:sqlite:test.db
        connection.url=jdbc:oracle:thin:@XXXX:XXXX/XXXXX
        connection.user=XXXX
        connection.password=XXXXX
        auto.create=true
        auto.evolve=true
        pk.mode=record_value
        insert.mode=insert
        pk.fields=id
        #fields.whitelist=product,quantity,price
        batch.size=0

错误

# ./confluent status jdbc-sink
    {"name":"jdbc-sink","connector":{"state":"RUNNING","worker_id":"10.87.40.165:8083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
            at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, name='product', isPrimaryKey=false}, as it is not optional and does not have a default value
            at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:137)
            at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:73)
            at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:78)
            at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
            at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:69)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
            ... 10 more
    ","id":0,"worker_id":"10.87.40.165:8083"}],"type":"sink"}            

我可以看到使用消费者消耗的数据,但数据没有加载到 oracle 表中。我将主题名称更改为 UPPERCASE 并尝试过,但仍然没有用。我也添加了auto.evolve=true 选项,但它不起作用。 在这里找到了类似的帖子,我在没有太多帮助的情况下浏览了它。

我只是在自动创建表。我可以看到在 oracle 中创建的表,但没有数据。

【问题讨论】:

    标签: apache-kafka apache-kafka-connect


    【解决方案1】:

    查看您的堆栈跟踪,这是错误:

    Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, 
    name='product', isPrimaryKey=false}, as it is not optional and does not have 
    a default value
    

    因此,您尝试加载的数据中包含字段product,没有默认值,并且您的目标表没有该列。

    【讨论】:

    • 不,它没有丢失,我可以看到使用所有字段创建的表。这是它的 DDL CREATE TABLE "orders" ("product" CLOB NOT NULL ENABLE, "quantity" NUMBER(10 ,0) NOT NULL ENABLE, "price" BINARY_FLOAT NOT NULL ENABLE, "id" NUMBER(10,0) NOT NULL ENABLE, PRIMARY KEY ("id") USING INDEX PCTFREE 10 INITRANS 2 MAXTRANS 255 TABLESPACE "USERS" ENABLE )
    • 它以 STRING 的形式出现,而您已将其定义为 CLOB - 这可能会导致您看到的问题。更改 DDL(或您的源架构),看看是否有帮助
    • 我可以通过为字段添加“默认值”来解决它。 {“名称”:“产品”,“类型”:“字符串”,“默认”:“aaa”}。我在源代码中定义为“字符串”,但是在创建表时它创建为“CLOB”,不确定我应该如何创建为“字符串”?我已将选项设为自动创建
    猜你喜欢
    • 2022-01-08
    • 2018-11-02
    • 2019-06-04
    • 2020-07-14
    • 2021-11-20
    • 1970-01-01
    • 2021-12-21
    • 2021-05-27
    • 1970-01-01
    相关资源
    最近更新 更多