【问题标题】:Unable to write kafka topic data into postgres DB using JDBC kafka-sink-connector无法使用 JDBC kafka-sink-connector 将 kafka 主题数据写入 postgres DB
【发布时间】:2021-11-20 06:48:32
【问题描述】:

我有一个 kafka 主题,我们在其中生成 avro 记录,架构如下所示。

{
  "type": "record",
  "name": "testValue",
  "namespace": "com.test.testValue",
  "fields": [
    {
        "name": "A",
      "type": "string"
    },
    {
      "name": "B",
      "type": "string"
    },
    {
      "name": "C",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 8,
        "scale": 2
      }
    },
    {
      "name": "D",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "E",
      "type": [
        {
          "type": "record",
          "name": "F",
          "fields": [
            {
              "name": "G",
              "type": {
                "type": "bytes",
                "logicalType": "decimal",
                "precision": 8,
                "scale": 2
              }
            }
          ]
        },
        {
          "type": "record",
          "name": "H",
          "fields": [
            {
              "name": "dummy",
              "type": "boolean",
              "default": true
            }
          ]
        },
        {
          "type": "record",
          "name": "I",
          "fields": [
            {
              "name": "J",
              "type": {
                "type": "bytes",
                "logicalType": "decimal",
                "precision": 8,
                "scale": 2
              }
            }
          ]
        }
      ]
    },
    {
      "name": "K",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "L",
      "type": "boolean"
    }
  ]
}

我对连接器有以下配置。

 {
 "name" : "test-sink-database",
 "config" : {
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "tasks.max": "1",
  "connection.url": "jdbc:postgresql://database_url/postgres",
  "topics": "test",
  "connection.user": "postgres",
  "connection.password": "password",
  "table.name.format": "test_table",
  "auto.create": "true",
  "schema.registry.url": "http://schema-registry:8081",
  "value.converter.schema.registry.url": "http://schema-registry:8081",
  "key.converter.schema.registry.url": "http://schema-registry:8081",
  "name": "test-sink-database",
  "value.converter":"io.confluent.connect.avro.AvroConverter",
  "key.converter":"io.confluent.connect.avro.AvroConverter",
  "insert.mode":"insert"
}

出现以下错误。

Caused by: org.apache.kafka.connect.errors.ConnectException: io.confluent.connect.avro.Union (STRUCT) type doesn't have a mapping to the SQL database column type

完整的堆栈跟踪

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: io.confluent.connect.avro.Union (STRUCT) type doesn't have a mapping to the SQL database column type
    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1727)
    at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getSqlType(PostgreSqlDatabaseDialect.java:215)
    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1643)
    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$33(GenericDatabaseDialect.java:1632)
    at io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:558)
    at io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:597)
    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1634)
    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1557)
    at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:91)
    at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)

【问题讨论】:

    标签: postgresql apache-kafka avro apache-kafka-connect confluent-platform


    【解决方案1】:

    我相信这是因为字段 E 的类型为 record。错误状态为(STRUCT) type doesn't have a mapping to the SQL database column type。您的E 定义如下

    "name": "E",
          "type": [
            {
              "type": "record",
              "name": "F",
              "fields": [
                {
                  "name": "G",
                  "type": {
                    "type": "bytes",
                    "logicalType": "decimal",
                    "precision": 8,
                    "scale": 2
                  }
                }
              ]
            },
            {
              "type": "record",
              "name": "H",
              "fields": [
                {
                  "name": "dummy",
                  "type": "boolean",
                  "default": true
                }
              ]
            },
            {
              "type": "record",
              "name": "I",
              "fields": [
                {
                  "name": "J",
                  "type": {
                    "type": "bytes",
                    "logicalType": "decimal",
                    "precision": 8,
                    "scale": 2
                  }
                }
              ]
            }
          ]
        }
    

    JDBC 接收器无法处理嵌套良好的数据结构。您可以尝试使用CAST Single Message transform 将其转换为字符串,并检查它是否已正确推送到数据库中。另一种方法是用另一种 SMT 来推销价值,例如

    "transforms": "flatten",
    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value"
    

    【讨论】:

    • 感谢您的快速支持,添加了以上配置但现在出现以下错误。引起:org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:219) 处的 java.lang.NullPointerException
    • 查看 issues.apache.org/jira/browse/KAFKA-6605 并尝试设置 "transforms.flatten.delimiter": "_"
    • 它给了我同样的错误,即原因:java.lang.NullPointerException
    猜你喜欢
    • 2020-03-21
    • 2021-10-01
    • 2018-11-02
    • 2021-09-17
    • 2020-07-26
    • 2019-06-04
    • 1970-01-01
    • 2021-11-16
    • 2021-12-21
    相关资源
    最近更新 更多