【发布时间】:2021-12-03 22:02:15
【问题描述】:
我正在尝试在 Kafka Streams 中加入一个简单的外键连接,类似于许多文章(例如:https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/)。
当我尝试将用户 id(用户表的主键)与 account_balance 表中的外键 user_id 连接以生成 AccountRecord 对象时,我收到以下错误:
[-StreamThread-1] ignJoinSubscriptionSendProcessorSupplier : Skipping record due to null foreign key.
最终目标是在任一表中的任何字段更新时将AccountRecord 传递给主题。问题是当我简单地分别打印用户表和帐户表时,外键和所有字段都被完全填充。我看不出有什么问题或为什么会发生此错误。这是我的代码的 sn-p:
public void start_test(){
StreamsBuilder builder = new StreamsBuilder();
KTable<Long, User> userTable = builder.table(USER_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.User()));
KTable<Long, AccountBalance> accountBalanceTable = builder.table(ACCOUNT_BALANCE_TOPIC, Consumed.with(CustomSerdes.UserPKey(), CustomSerdes.AccountBalance()));
final KTable<Long, AccountRecord> accountRecordTable = accountBalanceTable.join(
userTable,
AccountBalance::getUserId,
(account, user) -> new AccountRecord(user.getFirstName(), account.getBalance());
);
// print the table
accountRecordTable
.toStream()
.print(Printed.toSysOut());
KafkaStreams stream = new KafkaStreams(builder.build(), properties);
stream.start();
}
任何指导都会有所帮助。我没有包含自定义 serde 代码或对象形状,但它们非常简单。如果您需要进一步说明,请告诉我。
谢谢
【问题讨论】:
标签: java apache-kafka-streams apache-kafka-connect confluent-platform