【问题标题】:Kafka Connect JDBC Sink "schema does not exist"Kafka Connect JDBC Sink“模式不存在”
【发布时间】:2020-09-23 14:57:05
【问题描述】:

我正在尝试使用 kafka connect 将数据接收到 postgresql 中,但我收到架构不存在的错误。

是否有可能是包含点的主题名称产生问题,因为错误提到架构“logstash”不存在,这是直到第一个点的字符串?

错误:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: org.postgresql.util.PSQLException: ERROR: schema \"logstash\" does not exist
  Position: 14

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
    ... 10 more
Caused by: java.sql.SQLException: org.postgresql.util.PSQLException: ERROR: schema \"logstash\" does not exist
  Position: 14

    ... 12 more

接收器配置:

{
  "name": "jdbc.apache.access.log.sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "logstash.apache.access.log",
    "connection.url": "jdbc:postgresql://<IP_OF_POSTGRESQL>:5432/kafka",
    "connection.user": "kafka",
    "connection.password": "<PASSWORD>",
    "insert.mode": "upsert",
    "pk.mode": "kafka",
    "auto.create": true,
    "auto.evolve": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "true"
  }
}

架构(使用 API 调用):

{
  "subject": "logstash.apache.access.log-value",
  "version": 3,
  "id": 3,
  "schema": "{\"type\":\"record\",\"name\":\"log\",\"namespace\":\"value_logstash.apache.access\",\"fields\":[{\"name\":\"clientip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"verb\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"response\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"request\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"bytes\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
}

编辑:

我尝试用下划线创建一个新主题。看起来这些点确实是导致错误的原因。有什么可以避免的解决方案,还是我的配置有误...?

【问题讨论】:

  • FWIW,converter.schemas.enable 属性均无效。 1)StringConverter 没有架构 2)Avro 总是有架构并且不能被禁用

标签: apache-kafka apache-kafka-connect confluent-schema-registry


【解决方案1】:

在数据库写入的 Sink 操作之前,您应该能够使用 RegexRouter SMT 将主题重命名为不带句点的内容。

【讨论】:

    猜你喜欢
    • 2020-09-25
    • 1970-01-01
    • 2021-06-27
    • 2020-07-31
    • 2018-07-08
    • 2019-02-03
    • 2020-12-14
    • 2019-10-04
    • 2021-12-19
    相关资源
    最近更新 更多