【发布时间】:2019-11-24 19:46:39
【问题描述】:
我无法使用 KSQL 表沉入 postgres
我创建了一个 KSQL 表,其中包含来自流的一些聚合(源主题是 Avro)。我可以用 SELECT 查看数据。我也可以将主题直接放到 postgres 中。但是我不能沉入 Postgres 的 KSQL 表。如何指定 value.converter?
我创建了这样的 KSQL 表:
CREATE TABLE some_table AS SELECT customer_name, COUNT(*) as cnt FROM some_stream GROUP BY customer_name;
我尝试了如下连接配置:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
key.converter.schemas.enable=false
value.converter.schema.registry.url=http://localhost:8081
auto.evolve=true
tasks.max=1
topics=some_table
auto.create=true
value.converter=io.confluent.connect.avro.AvroConverter
connection.url=jdbc:postgresql://localhost:5432/mydb?user=postgres&password=postgres
key.converter=org.apache.kafka.connect.storage.StringConverter
错误是:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic some_topic to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
我也试过了:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter.schemas.enable": "false",
"auto.evolve": "true",
"tasks.max": "1",
"topics": "some_topic",
"value.converter.schemas.enable": "false",
"auto.create": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connection.url": "jdbc:postgresql://localhost:5432/mydb?user=postgres&password=postgres",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
错误是:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
那么如何使用 JdbcSinkConnector 对 KSQL 表下沉?
【问题讨论】:
-
问题解决了吗?
标签: postgresql apache-kafka apache-kafka-connect ksqldb