【问题标题】:Cannot persist KSQL aggregate table to Postgres无法将 KSQL 聚合表持久化到 Postgres
【发布时间】:2020-07-05 10:24:30
【问题描述】:

我正在尝试使用 JDBC 接收器连接器在我的 Postgres DB 中镜像 KSQL 表,但不幸的是我无法使其工作。

我正在使用 Kafka 5.4.1,并且我有 2 个 debezium 1.0 主题使用来自我的 Postgres DB 的 Avro 序列化。这是我的 Debezium 连接器的配置:

    {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.dbname": "xxx",
        "tasks.max": "1",
        "database.history.kafka.bootstrap.servers": "kafka-svc:9092",
        "database.history.kafka.topic": "dbhistory.xxx",
        "database.server.name": "xxx",
        "database.port": "5432",
        "plugin.name": "decoderbufs",
        "table.whitelist": "public.a,public.b",
        "database.hostname": "app-db",
        "name": "connector",
        "connection.url": "jdbc:postgresql://app-db:5432/xxx",
        "database.whitelist": "xxx",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.add.source.fields": "table"
    }

然后我使用 KSQL CLI 与我的服务器交互并发出以下命令:

CREATE STREAM a_dbz
WITH (KAFKA_TOPIC='xxx.public.a', VALUE_FORMAT='AVRO');

CREATE STREAM b_dbz
WITH (KAFKA_TOPIC='xxx.public.b', VALUE_FORMAT='AVRO');

CREATE STREAM a_by_b_id
WITH (KAFKA_TOPIC='a_by_b_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM a_dbz PARTITION BY b_id;

CREATE STREAM b_by_id
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM b_dbz PARTITION BY id;

TLDR,我从 debezium 主题创建 2 个流,并对它们进行重新分区,以使它们为 JOIN 做好准备。 然后,我将其中一个 (b_by_id) 转换为表,因为在这种情况下我不想使用窗口连接:

CREATE TABLE b
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', KEY='id');

此时一切正常,我可以使用我的流、表和连接,并看到我的源数据库中的更改立即反映在我在 KSQL 中的流查询中。 当我决定对我的数据执行一些聚合函数并将结果镜像到我的 Postgres 数据库(与源数据库相同)时,我的问题就出现了。为了做到这一点,我创建了一个新的 KSQL 表作为 SELECT 的结果:

CREATE TABLE grouped_data AS
SELECT x, y, z, MAX(date) AS max_date
FROM a_by_b_id
INNER JOIN b ON a_by_b_id.b_id = b.id
GROUP BY x, y, z
EMIT CHANGES;

然后,我设置了一个 JDBC 接收器连接器,以使用以下配置将我的新表的 grouped_data 更改日志主题转储到我的数据库:

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://app-db:5432/xxx",
    "insert.mode": "upsert",
    "auto.create": true,
    "auto.evolve": true,
    "topics": "grouped_data",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry-svc:8081",
    "pk.mode": "record_value",
    "pk.fields": "x, y, z",
    "table.name.format" : "kafka_${topic}",
    "transforms": "TimestampConverter",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.field": "max_date",
    "transforms.TimestampConverter.target.type": "Timestamp"
}

不幸的是,我的接收器数据库上没有任何错误,也没有数据。连接器已正确创建和配置,即使我强制流式查询处理新消息,也没有数据传输到我的接收器数据库,甚至没有创建目标表。 我尝试使用不同的名称和配置、pk.mode 的不同值等多次创建连接器,但我无法让它工作。为我上面的表“b”创建一个连接器工作得很好,所有数据都会立即传输。

以下是我尝试镜像到 postgres 的 KSQL 表的其他详细信息:

describe extended grouped_data;

Name                 : GROUPED_DATA
Type                 : TABLE
Key field            : 
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : GROUPED_DATA (partitions: 1, replication: 1)

 Field              | Type                      
------------------------------------------------
 ROWTIME            | BIGINT           (system) 
 ROWKEY             | VARCHAR(STRING)  (system) 
 X                  | BIGINT                    
 Y                  | BIGINT                    
 Z                  | BIGINT                    
 MAX_DATE           | BIGINT                    
------------------------------------------------

谢谢!

【问题讨论】:

  • 感谢一个写得很好的问题,其中包含清晰的示例、版本控制等????
  • 谢谢@RobinMoffatt!

标签: apache-kafka-connect confluent-platform ksqldb


【解决方案1】:

您已将 Kafka Connect 配置为使用小写主题名称

"topics": "grouped_data",

但根据您的DESCRIBE 输出,表写入的主题是大写的:

Kafka topic          : GROUPED_DATA (partitions: 1, replication: 1)

如果你仔细检查你的 Kafka Connect 工作日志,你会发现:

Error while fetching metadata with correlation id 2 : {grouped_data=LEADER_NOT_AVAILABLE} 

如果您给它一个不存在的主题,Kafka Connect 将不会中止 - 因为这可能是您打算指定的主题,因为您随后将填充它。

所以,您可以修改您的 Kafka Connect 工作器配置以使用大写主题名称,或者您可以重新定义您的 ksqlDB 表并在 DDL 中包含 …WITH (KAFKA_TOPIC='grouped_data')

【讨论】:

  • 它工作(部分)!现在 Kafka Connect 正在从 Kafka 获取正确的主题元数据,但即使我通过修改源数据库中的表 a 通过 GROUPED_DATA 主题发送消息,我也看不到目标数据库中的表,也看不到任何正在传输的数据。
  • 好的,我找到了最后一点丢失的地方。对于使用带有 KSQL 大写流的 Postgres 的任何人,请确保在查询目标数据库时使用引号!在我的情况下,我的表是由 Kafka Connect 创建的,混合了大写和小写:kafka_GROUPED_TABLE。为了查询该表,您需要执行以下操作: SELECT * FROM "kafka_GROUPED_TABLE"; . TLDR 不要忘记 SQL 查询中的引号或按照上面 Robin 的建议修改 KSQL 表定义 (.... WITH (KAFKA_TOPIC='grouped_data'))
猜你喜欢
  • 2019-04-11
  • 2020-09-09
  • 2019-11-24
  • 1970-01-01
  • 2011-02-07
  • 2022-01-20
  • 2021-09-19
  • 1970-01-01
  • 2019-05-08
相关资源
最近更新 更多