【发布时间】:2020-07-05 10:24:30
【问题描述】:
我正在尝试使用 JDBC 接收器连接器在我的 Postgres DB 中镜像 KSQL 表,但不幸的是我无法使其工作。
我正在使用 Kafka 5.4.1,并且我有 2 个 debezium 1.0 主题使用来自我的 Postgres DB 的 Avro 序列化。这是我的 Debezium 连接器的配置:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "xxx",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "kafka-svc:9092",
"database.history.kafka.topic": "dbhistory.xxx",
"database.server.name": "xxx",
"database.port": "5432",
"plugin.name": "decoderbufs",
"table.whitelist": "public.a,public.b",
"database.hostname": "app-db",
"name": "connector",
"connection.url": "jdbc:postgresql://app-db:5432/xxx",
"database.whitelist": "xxx",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.source.fields": "table"
}
然后我使用 KSQL CLI 与我的服务器交互并发出以下命令:
CREATE STREAM a_dbz
WITH (KAFKA_TOPIC='xxx.public.a', VALUE_FORMAT='AVRO');
CREATE STREAM b_dbz
WITH (KAFKA_TOPIC='xxx.public.b', VALUE_FORMAT='AVRO');
CREATE STREAM a_by_b_id
WITH (KAFKA_TOPIC='a_by_b_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM a_dbz PARTITION BY b_id;
CREATE STREAM b_by_id
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM b_dbz PARTITION BY id;
TLDR,我从 debezium 主题创建 2 个流,并对它们进行重新分区,以使它们为 JOIN 做好准备。 然后,我将其中一个 (b_by_id) 转换为表,因为在这种情况下我不想使用窗口连接:
CREATE TABLE b
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', KEY='id');
此时一切正常,我可以使用我的流、表和连接,并看到我的源数据库中的更改立即反映在我在 KSQL 中的流查询中。 当我决定对我的数据执行一些聚合函数并将结果镜像到我的 Postgres 数据库(与源数据库相同)时,我的问题就出现了。为了做到这一点,我创建了一个新的 KSQL 表作为 SELECT 的结果:
CREATE TABLE grouped_data AS
SELECT x, y, z, MAX(date) AS max_date
FROM a_by_b_id
INNER JOIN b ON a_by_b_id.b_id = b.id
GROUP BY x, y, z
EMIT CHANGES;
然后,我设置了一个 JDBC 接收器连接器,以使用以下配置将我的新表的 grouped_data 更改日志主题转储到我的数据库:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://app-db:5432/xxx",
"insert.mode": "upsert",
"auto.create": true,
"auto.evolve": true,
"topics": "grouped_data",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry-svc:8081",
"pk.mode": "record_value",
"pk.fields": "x, y, z",
"table.name.format" : "kafka_${topic}",
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "max_date",
"transforms.TimestampConverter.target.type": "Timestamp"
}
不幸的是,我的接收器数据库上没有任何错误,也没有数据。连接器已正确创建和配置,即使我强制流式查询处理新消息,也没有数据传输到我的接收器数据库,甚至没有创建目标表。 我尝试使用不同的名称和配置、pk.mode 的不同值等多次创建连接器,但我无法让它工作。为我上面的表“b”创建一个连接器工作得很好,所有数据都会立即传输。
以下是我尝试镜像到 postgres 的 KSQL 表的其他详细信息:
describe extended grouped_data;
Name : GROUPED_DATA
Type : TABLE
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : AVRO
Kafka topic : GROUPED_DATA (partitions: 1, replication: 1)
Field | Type
------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
X | BIGINT
Y | BIGINT
Z | BIGINT
MAX_DATE | BIGINT
------------------------------------------------
谢谢!
【问题讨论】:
-
感谢一个写得很好的问题,其中包含清晰的示例、版本控制等????
-
谢谢@RobinMoffatt!
标签: apache-kafka-connect confluent-platform ksqldb