【发布时间】:2019-12-02 06:50:18
【问题描述】:
我有一个 Kafkastreams 应用程序,我正在尝试将交易加入商店。
这适用于新事件,因为商店总是在交易发生之前创建,但是当我尝试读取“历史”数据(在应用程序启动之前发生的事件)时,有时所有交易都会加入商店并且有时更少,大约 60%-80%,但一旦我得到 30%。
这很奇怪,因为我知道所有交易都有一个正确的商店 ID,因为有时它会加入所有交易。 这些事件在一个主题中,我使用过滤器将它们放入两个流中。然后我从商店流中创建一个 KTable,然后将事务流加入到商店表中。
final KStream<String, JsonNode> shopStream = eventStream
.filter((key, value) -> "SHOP_CREATED_EVENT".equals(value.path("eventType").textValue()))
.map((key, value) -> KeyValue.pair(value.path("shop_id").textValue(), value)
);
final KStream<String, JsonNode> transactionStream = eventStream
.filter((key, value) -> "TRANSACTION_EVENT".equals(value.path("eventType").textValue()))
.map((key, value) -> KeyValue.pair(value.path("shop_id").textValue(), value)
);
final KTable<String, JsonNode> shopTable = shopStream
.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
.reduce(genericReducer);
final KStream<String, JsonNode> joinedStream = transactionStream
.join(shopTable, this::joinToShop, Joined.valueSerde(jsonSerde))
我也尝试使用流连接而不是流到表,结果相同:
final KStream<String, JsonNode> joinedStream = transactionStream
.join(shopStream, this::joinToShop,JoinWindows.of(Duration.ofMinutes(5)), Joined.with(Serdes.String(), jsonSerde, jsonSerde) )
最后我将joinedStream写入一个输出主题:
joinedStream
.map((key, value) -> KeyValue.pair(value.path("transactionId").textValue(), value))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), jsonSerde));
然后我创建两个 keyValue 存储来计算原始交易的数量和加入的交易数量:
Materialized.with(Serdes.String(), Serdes.Long());
transactionStream
.map((key, value) -> KeyValue.pair(SOURCE_COUNT_KEY, value))
.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
.count(as(SOURCE_STORE))
;
joinedStream
.map((key, value) -> KeyValue.pair(TARGET_COUNT_KEY, value))
.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
.count(as(TARGET_STORE))
;
过滤器之所以起作用,是因为当我在 shopStream 和 transactionStream 中输出所有事件时,我可以看到所有事件都已到达,并且仅在打印所有事件后才开始连接。
我还可以看到,商店创建事件在该商店的交易事件之前到达。还有很奇怪的是,有时当我在同一家商店进行 10 笔交易时,正确连接了 7 笔,而缺少了 3 笔(例如)。
键值存储中的计数也是正确的,因为它在输出主题中的事件数量相同。 缺少的连接不会触发 joinToShop() 方法。
所以我的问题是为什么会这样?有时它会处理所有事件,有时只是其中的一部分?以及如何确保所有活动都加入?
【问题讨论】:
标签: join apache-kafka apache-kafka-streams