【问题标题】:Kafka Connect JDBC sink connector not workingKafka Connect JDBC 接收器连接器不起作用
【发布时间】:2018-02-06 07:12:05
【问题描述】:

我正在尝试使用 Kafka Connect JDBC 接收器连接器将数据插入 Oracle,但它会引发错误。我已经尝试了架构的所有可能配置。下面是例子。

如果我缺少以下任何内容,请建议我的配置文件和错误。

案例 1- 第一次配置

internal.value.converter.schemas.enable=false .

所以我得到了

[2017-08-28 16:16:26,119] INFO Sink task WorkerSinkTask{id=oracle_sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)

[2017-08-28 16:16:26,606] INFO Discovered coordinator dfw-appblx097-01.prod.walmart.com:9092 (id: 2147483647 rack: null) for group connect-oracle_sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)

[2017-08-28 16:16:26,608] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)

[2017-08-28 16:16:26,609] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:16:27,174] INFO Successfully joined group connect-oracle_sink with generation 26 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)

[2017-08-28 16:16:27,176] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)

[2017-08-28 16:16:28,580] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)

org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DJ

   at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:190)

   at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)

   at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)

   at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)

   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)

   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)

   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)

   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)

   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

   at java.util.concurrent.FutureTask.run(FutureTask.java:266)

   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

   at java.lang.Thread.run(Thread.java:748)

第二次配置 -

internal.key.converter.schemas.enable=true

internal.value.converter.schemas.enable=true

日志:

[2017-08-28 16:23:50,993] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)

[2017-08-28 16:23:50,993] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:23:51,260] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)

[2017-08-28 16:23:51,381] INFO Successfully joined group connect-oracle_sink with generation 29 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)

[2017-08-28 16:23:51,384] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)

[2017-08-28 16:23:51,727] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

   at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)

Oracle connector.properties 看起来像

name=oracle_sink

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

tasks.max=1

topics=DJ

connection.url=jdbc:oracle:thin:@hostname:port:sid

connection.user=username

connection.password=password

#key.converter=org.apache.kafka.connect.json.JsonConverter

#value.converter=org.apache.kafka.connect.json.JsonConverter

auto.create=true

auto.evolve=true

Connect-Standalone.properties

我的 JSON 看起来像 -

{"Item":"12","Sourcing Reason":"corr","Postal Code":"l45","OrderNum":"10023","Intended Node Distance":1125.8,"Chosen Node":"34556","Quantity":1,"Order Date":1503808765201,"Intended Node":"001","Chosen Node Distance":315.8,"Sourcing Logic":"reducesplits"}

【问题讨论】:

    标签: apache-kafka kafka-consumer-api apache-kafka-connect


    【解决方案1】:

    the documentation

    接收器连接器需要架构知识,因此您应该使用合适的转换器,例如架构注册表附带的 Avro 转换器,或启用了架构的 JSON 转换器

    因此,如果您的数据是 JSON,您将具有以下配置:

    [...]
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    [...]
    

    您在第二个实例中看到的错误是相关的 - JsonConverter with schemas.enable requires "schema" and "payload" fields - 您共享的 JSON 不符合此要求的格式。

    下面是一个简单的带有schemapayload 的有效JSON 消息示例:

    {
        "schema": {
            "type": "struct",
            "fields": [{
                "type": "int32",
                "optional": true,
                "field": "c1"
            }, {
                "type": "string",
                "optional": true,
                "field": "c2"
            }, {
                "type": "int64",
                "optional": false,
                "name": "org.apache.kafka.connect.data.Timestamp",
                "version": 1,
                "field": "create_ts"
            }, {
                "type": "int64",
                "optional": false,
                "name": "org.apache.kafka.connect.data.Timestamp",
                "version": 1,
                "field": "update_ts"
            }],
            "optional": false,
            "name": "foobar"
        },
        "payload": {
            "c1": 10000,
            "c2": "bar",
            "create_ts": 1501834166000,
            "update_ts": 1501834166000
        }
    }
    

    您尝试登陆 Oracle 的数据的来源是什么?如果是 Kafka Connect 入站,那么您只需使用相同的 converter 配置(Avro + Confluent Schema Registry)会更容易、更高效。如果它是一个自定义应用程序,您需要让它 (a) 使用 Confluent Avro 序列化器或 (b) 以上述所需格式编写 JSON,提供与消息内联的有效负载架构。

    【讨论】:

    • 我有 (15-20) 个 kafka 主题,每个主题都有不同的字段和不同的架构。我想使用 JDBC 接收器连接器,以便在 oracle 中为每个主题创建一个表。同样在我的情况下,JSON数据也有一些嵌套字段,我猜这些字段不是由接收器转换器处理的。通过包含架构和有效负载字段来更改每个主题的架构似乎不是一个好的选择。关于如何继续此用例的任何建议。
    • “通过包含架构和有效负载字段来更改每个主题的架构似乎不是一个好的选择” - 您必须这样做,或者使用 Avro。时期。接收器如何知道要为其加载数据的表的架构。您正在加载的数据的来源是什么?
    • 为什么您使用自定义 Kafka 生产者应用程序从 Oracle 读取数据,而不是 Kafka Connect?如果您使用 Kafka Connect 进行所有集成,生活会简单得多 :)
    • 这不能回答我的问题。您可以使用 Kafka Connect 从多个表中提取数据,可以使用单个连接器或多个连接器。
    • 我可以编写一个自定义模式注册表然后尝试读取 kafka 主题中的 JSON 数据并插入到 oracle 中。
    【解决方案2】:

    读完这篇文章后,我也遇到了同样的问题。我已经用 JDBC Sink MySQL 解决了 在我的 Kafka Connect 配置下方,作为附加信息:

    curl --location --request POST 'http://localhost:8083/connectors/' \
    --header 'Accept: application/json' \
    --header 'Content-Type: application/json' \
    --data-raw '{
        "name": "jdbc-sink",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "tasks.max": "1",
            "topics": "ttib-transactions",
            "connection.url": "jdbc:mysql://172.17.0.1:6603/tt-tran?verifyServerCertificate=true&useSSL=false",
            "connection.user": "root",
            "connection.password": "*******",
            "value.converter.schema.registry.url": "https://psrc-j55zm.us-central1.gcp.confluent.cloud",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "true",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "key.converter.schemas.enable": "false",
            "insert.mode": "insert",
            "batch.size":"0",
            "table.name.format": "${topic}",
            "pk.fields" :"id"
        }
    }'
    

    【讨论】:

      猜你喜欢
      • 2019-06-17
      • 2019-11-17
      • 2020-05-13
      • 2020-01-15
      • 2020-01-11
      • 2018-08-07
      • 2022-11-24
      • 2019-11-15
      • 2020-07-31
      相关资源
      最近更新 更多