【问题标题】:Kafka Streams: How to get the first and the last record of a SessionWindow?Kafka Streams:如何获取 SessionWindow 的第一条和最后一条记录?
【发布时间】:2019-03-27 01:34:13
【问题描述】:

默认情况下,.windowedBy(SessionWindows.with(Duration.ofSeconds(60)) 为每个传入记录返回一条记录。

结合.count().filter() 可以轻松检索第一条记录。

使用 .suppress(Suppressed.untilWindowCloses(unbounded())) 检索最后一条记录也很容易。

所以……我做了两次处理,你可以看到改编后的字数示例:


final KStream<String, String> streamsBranches = builder.<String,String>stream("streams-plaintext-input");

streamsBranches
  .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
  .groupBy((key, value) -> ""+value)
  .windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
  .count(Materialized.with(Serdes.String(), Serdes.Long()))
  .toStream()
  .map((wk, v) -> new KeyValue<>(wk.key(), v == null ? -1l : v))
  .filter((wk, v) -> v == 1)
  .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

streamsBranches
  .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
  .groupBy((key, value) -> ""+value)
  .windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
  .count(Materialized.with(Serdes.String(), Serdes.Long()))
  .suppress(Suppressed.untilWindowCloses(unbounded()))
  .toStream()
  .map((wk, v) -> new KeyValue<>(wk.key(), v))
  .filter((wk, v) -> v != null)
  .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

但我想知道是否有更简单、更漂亮的方式来做同样的事情。

【问题讨论】:

  • 您想获得什么样的第一条和最后一条记录?您使用基于不同 keys 的分组,所以我认为它不会像您预期的那样工作。
  • 不同的key用来说明我想要什么。我编辑了帖子以删除它们以使其更清晰。我只想从会话窗口中获取第一条和最后一条记录。
  • 我认为你的代码仍然没有达到你想要的效果。

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

我认为您应该使用 SessionWindowedKStream::aggregate(...) 并根据您的逻辑将结果累积到 aggregator(第一个和最后一个值)

示例代码可能如下所示:

streamsBranches.groupByKey()
        .windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
        .aggregate(
                AggClass::new,
                (key, value, oldAgg) -> oldAgg.update(value),
                (key, agg1, agg2) -> agg1.merge(agg2),
                Materialized.with(Serdes.String(), new AggClassSerdes())
        ).suppress(Suppressed.untilWindowCloses(unbounded()))
        .toStream().map((wk, v) -> new KeyValue<>(wk.key(), v))
.to("streams-wordcount-output", Produced.with(Serdes.String(), new AggClassSerdes()));

AggClass 是累加器,AggClassSerdes 是该累加器的 Serdes

public class AggClass {
    private String first;
    private String last;

    public AggClass() {}

    public AggClass(String first, String last) {
        this.first = first;
        this.last = last;
    }

    public AggClass update(String value) {
        if (first == null)
            first = value;
        last = value;
        return this;
    }

    public AggClass merge(AggClass other) {
        if (this.first == null)
            return other;
        else return new AggClass(this.first, other.last);
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-10-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-28
    • 2017-05-08
    • 2012-01-23
    相关资源
    最近更新 更多