【问题标题】:Push Data from Kafka Topic to PostgreSQL in JSON以 JSON 格式将数据从 Kafka 主题推送到 PostgreSQL
【发布时间】:2019-07-29 23:11:04
【问题描述】:

更新后出错

[2019-07-29 12:52:23,301] INFO Initializing writer using SQL dialect: PostgreSqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)
[2019-07-29 12:52:23,303] INFO WorkerSinkTask{id=sink-postgres-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:301)
[2019-07-29 12:52:23,367] WARN [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Error while fetching metadata with correlation id 2 : {kafkadad=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:1023)
[2019-07-29 12:52:23,368] INFO Cluster ID: _gRuX5-0SUu72wzy6PV0Ag (org.apache.kafka.clients.Metadata:365)
[2019-07-29 12:52:23,369] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Discovered group coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
[2019-07-29 12:52:23,372] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459)
[2019-07-29 12:52:23,373] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,383] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,482] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455)
[2019-07-29 12:52:23,486] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Setting newly assigned partitions: kafkadad-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2019-07-29 12:52:23,501] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Resetting offset for partition kafkadad-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-07-29 12:52:35,338] ERROR WorkerSinkTask{id=sink-postgres-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:701)
        at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
        at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:745)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
[2019-07-29 12:52:35,347] ERROR WorkerSinkTask{id=sink-postgres-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-07-29 12:52:35,347] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:105)
[2019-07-29 12:52:35,349] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Member consumer-1-bdbc7035-7625-4701-9ca7-c1ffa6863456 sending LeaveGroup request to coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)

生产者控制台:

connect-standalone.properties 文件

bootstrap.servers=localhost:9092 
key.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=false 
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets 
offset.flush.interval.ms=10000
plugin.path=/home/kafka/confluent-5.2.1/share/java

connect-post.properties 文件

name=sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
topics=kafkada
connection.url=jdbc:postgresql://localhost:5432/kafkadb?
user=postgres&password=postgres
insert.mode=upsert
table.name.format=kafkatable
pk.mode=none
pk.fields=none
auto.create=true 
auto.evolve=false
offset.storage.file.filename=/tmp/post-sink.offsets

上面的错误是我通过apache kafka ./bin/connect-standalone.sh config/connect-standalone.properties config.postgresql.properties 造成的。

然后,我已经尝试并实现了这个链接中提到的流程:

https://hellokoding.com/kafka-connect-sinks-data-to-postgres-example-with-avro-schema-registry-and-python

但是,这里的数据是使用 avro 从 Python 代码生成的。但就我而言,我已经有来自 kafka 主题中传感器(JSON 格式)的数据,我想将其发送到 postgreSQL,而不是通过代码生成数据。

那么,我该如何实现从 kafka 主题发送数据到 postgreSQL 的这种流程。

我已共享我的属性文件如果需要更正,请告诉我。 我正在发送简单的 json 数据,例如 "{"cust_id": 1313131, "month": 12, "expenses": 1313.13}" 我也尝试发送此类数据但仍然存在错误

示例 json 数据

 {
        "schema": {
            "type": "struct",
            "fields": [
                {
                    "type": "int32",
                    "optional": false,
                    "field": "customer_id"
                },
                {
                    "type": "int32",
                    "optional": true,
                    "field": "month"
                },

                {
                    "type": "string",
                    "optional": true,
                    "field": "amount_paid"
                }
            ],
            "optional": false,
            "name": "msgschema"
        },
        "payload": {
           "cust_id": 13, 
           "month": 12, 
           "expenses": 1313.13
        }
    }

我有一个名为 kafkatable 的表,其列名称为 (customer_id, month, amount_paid) 使用

"CREATE TABLE kafkatable(customer_id int8, month int4, amount_paid decimal(9,2));"

【问题讨论】:

  • 您能否展示您的消息示例(使用控制台消费者)?还有python代码吗?该错误似乎表明您没有完整的 JSON 有效负载
  • 嗨 @cricket_007 到目前为止,我没有使用任何 python 代码,请查看我添加的属性文件,并通过控制台生成器发送我共享的示例 json 数据问题。
  • 你能显示控制台生产者命令吗?如果您使用 cat 和管道直接发送该文件,那么它不能是多行...如果您已经有关于此类主题的现有数据,那么您仍然会收到错误,因此您需要一个新主题
  • @cricket_007 是的,我总是在运行生产者命令或其他连接独立命令之前创建新主题。因此,我在发送的消息中发布了生产者的新图像。 (另外,请告诉我,如果我用模式发送这样的消息,那么我不需要在 postgresql 中手动创建一个表,对吗?生产者自动创建的这个 scheam?)非常感谢您的回复。 .. :) 请检查更新的错误图像..
  • 嗨@cricket_007,请查看更新的图像。

标签: postgresql apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

我通过以下更改解决了这个错误

  1. insert.mode=插入
  2. 注释掉table.name.format=kafkatable,因为表将通过自动创建 创建
  3. 删除 connection.url 行末尾的问号。
  4. pk.fields 此处不应保留 none,请确保提供列名 以避免并发症。
  5. postgresql 不支持 int32,所以当我将它更改为 int8 时,它工作正常。
  6. 架构和负载中的字段名称不同,请确保使用相同的名称。

【讨论】:

  • 谢谢 Vishruti,这些更改使我的 Kafka 到 Postgresql 运行良好。
【解决方案2】:

Kafka Connect 是 Apache Kafka 的一部分,非常适合这种情况。您可以通过here 了解有关 Kafka Connect 的更多信息。

要将数据从您的 Kafka 主题流式传输到 Postgres(或任何其他数据库),请使用 JDBC Sink 连接器,您可以从 here 获得该连接器。

【讨论】:

  • JDBC 接收器不需要 JSON 中的模式吗?
  • 是的,好点,对于关系数据库,你必须在某个时候有一个模式:)
  • 是的@RobinMoffatt,我尝试了 Kafka 连接并也使用了 Sink。但正如 cricket_007 所说,如何将模式放置在生产者中,或者你有任何 python 脚本示例。我为 avro 做了以下事情:docs.confluent.io/current/connect/kafka-connect-jdbc/…,但这是针对 avro,类似地如何为 JSON 数据做这件事。然后,我做到了,stackoverflow.com/questions/49022120/…
  • 然后发生这样的错误,原因是:org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close对象的标记(起始标记在 [Source: (byte[])"{"; line: 1, column: 1]) at [Source: (byte[])"{";行:1,列:3] 引起:com.fasterxml.jackson.core.io.JsonEOFException:意外的输入结束:对象的预期关闭标记(开始标记在 [Source: (byte[])"{" ; line: 1, column: 1]) at [Source: (byte[])"{";行:1,列:3]
  • @cricket_007 嗨,您是否遇到过这些错误。请帮助修复它。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-11-10
  • 2021-12-19
  • 2017-02-07
  • 2020-07-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多