【发布时间】:2021-01-15 05:34:32
【问题描述】:
我正在尝试使用 kafka-connect 从两个表中获取行。
我是这样配置connect-file-source.properties的
name=jdbc_source_postgres_foobar_01
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:postgresql://localhost:5432/store?user=postgres&password=root
table.whitelist=author,book
mode=incrementing
incrementing.column.name=id
validate.non.null=false
topics=author,book
topic.prefix=
涉及的表有author和book,后者有外键指的是author。
然后我注册了一个监听器来消费来自“作者”和“书籍”主题的消息,以便将它们插入另一个数据库。
@KafkaListener(
topics={"author","book"},
groupId = "foo",
containerFactory = "fooKafkaListenerContainerFactory"
)
public void listenGroupFoo(@Payload PostgresTableRow message) {
System.out.println("Received" + message);
String tableName = message.tableName();
HashMap<String, Object> params = message.params();
insert(tableName, params);
}
当涉及的表彼此之间没有约束时,这工作得很好,但在这种情况下,当来自“book”主题的消息在来自“author”的消息之前被消费时,我会收到错误。
例如,我在源数据库中插入带有id=23 的作者“George Orwell”和带有id=37 和authorId=23 的书“1984”,两条消息被推送到Kafka,一条在“作者”主题还有一个在“书”主题。
如果消息首先从“书”主题消费,然后从“作者”主题消费,我会收到无法在我的接收器数据库中插入 ID 为 37 的书的错误,因为不存在 ID 为 23 的此类作者。
那么我该如何解决呢?有没有办法将多个表推入一个主题并授予订单?
【问题讨论】:
-
您使用的是哪个源和目标数据库引擎?
标签: apache-kafka apache-kafka-connect