【问题标题】:KafkaStreams - joins are not always triggeredKafkaStreams - 连接并不总是被触发
【发布时间】:2019-12-02 06:50:18
【问题描述】:

我有一个 Kafkastreams 应用程序,我正在尝试将交易加入商店。

这适用于新事件,因为商店总是在交易发生之前创建,但是当我尝试读取“历史”数据(在应用程序启动之前发生的事件)时,有时所有交易都会加入商店并且有时更少,大约 60%-80%,但一旦我得到 30%。

这很奇怪,因为我知道所有交易都有一个正确的商店 ID,因为有时它会加入所有交易。 这些事件在一个主题中,我使用过滤器将它们放入两个流中。然后我从商店流中创建一个 KTable,然后将事务流加入到商店表中。

final KStream<String, JsonNode> shopStream = eventStream
            .filter((key, value) -> "SHOP_CREATED_EVENT".equals(value.path("eventType").textValue()))
            .map((key, value) -> KeyValue.pair(value.path("shop_id").textValue(), value)
            );

final KStream<String, JsonNode> transactionStream = eventStream
            .filter((key, value) -> "TRANSACTION_EVENT".equals(value.path("eventType").textValue()))
            .map((key, value) -> KeyValue.pair(value.path("shop_id").textValue(), value)
            );

final KTable<String, JsonNode> shopTable = shopStream
            .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
            .reduce(genericReducer);

final KStream<String, JsonNode> joinedStream = transactionStream
            .join(shopTable, this::joinToShop, Joined.valueSerde(jsonSerde))

我也尝试使用流连接而不是流到表,结果相同:

final KStream<String, JsonNode> joinedStream = transactionStream
            .join(shopStream, this::joinToShop,JoinWindows.of(Duration.ofMinutes(5)), Joined.with(Serdes.String(), jsonSerde, jsonSerde) )

最后我将joinedStream写入一个输出主题:

  joinedStream
            .map((key, value) -> KeyValue.pair(value.path("transactionId").textValue(), value))
            .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), jsonSerde));

然后我创建两个 keyValue 存储来计算原始交易的数量和加入的交易数量:

    Materialized.with(Serdes.String(), Serdes.Long());
    transactionStream
            .map((key, value) -> KeyValue.pair(SOURCE_COUNT_KEY, value))
            .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
            .count(as(SOURCE_STORE))
    ;


    joinedStream
            .map((key, value) -> KeyValue.pair(TARGET_COUNT_KEY, value))
            .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
            .count(as(TARGET_STORE))
    ;

过滤器之所以起作用,是因为当我在 shopStream 和 transactionStream 中输出所有事件时,我可以看到所有事件都已到达,并且仅在打印所有事件后才开始连接。

我还可以看到,商店创建事件在该商店的交易事件之前到达。还有很奇怪的是,有时当我在同一家商店进行 10 笔交易时,正确连接了 7 笔,而缺少了 3 笔(例如)。

键值存储中的计数也是正确的,因为它在输出主题中的事件数量相同。 缺少的连接不会触发 joinToShop() 方法。

所以我的问题是为什么会这样?有时它会处理所有事件,有时只是其中的一部分?以及如何确保所有活动都加入?

【问题讨论】:

    标签: join apache-kafka apache-kafka-streams


    【解决方案1】:

    根据时间戳处理数据。但是,在旧版本中,Kafka Streams 采用尽力而为的方法来根据时间戳从不同主题读取数据。

    我建议升级到 2.1 版(或更高版本)以改进时间戳同步并应避免该问题(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多