【问题标题】:Aggregate a large amount of data using Kafka streams使用 Kafka 流聚合大量数据
【发布时间】:2017-12-08 10:34:57
【问题描述】:

我尝试使用 Kafka Streams 使用不同大小的时间窗口聚合大量数据。

我将缓存大小增加到 2 GB,但是当我在 1 小时内设置窗口大小时,CPU 负载达到 100%,应用程序开始变慢。

我的代码如下所示:

val tradeStream = builder.stream<String, Trade>(configuration.topicNamePattern, Consumed.with(Serdes.String(), JsonSerde(Trade::class.java)))

tradeStream
    .groupBy(
            { _, trade -> trade.pair },
            Serialized.with(JsonSerde(TokensPair::class.java), JsonSerde(Trade::class.java))
    )
    .windowedBy(TimeWindows.of(windowDuration).advanceBy(windowHop).until(windowDuration))
    .aggregate(
        { Ticker(windowDuration) },
        { _, newValue, aggregate -> aggregate.add(newValue) },
        Materialized.`as`<TokensPair, Ticker>(storeByPairs)
                .withKeySerde(JsonSerde(TokensPair::class.java))
                .withValueSerde(JsonSerde(Ticker::class.java))
    )
    .toStream()
    .filter { tokensPair, _ -> filterFinishedWindow(tokensPair.window(), windowHop) }
    .map { tokensPair, ticker -> KeyValue(
            TickerKey(ticker.tokensPair!!, windowDuration, Timestamp(tokensPair.window().start())),
            ticker.calcPrice()
    )}
    .to(topicName, Produced.with(JsonSerde(TickerKey::class.java), JsonSerde(Ticker::class.java)))

此外,在将聚合数据发送到 kafka 主题之前,它们会根据窗口的结束时间进行过滤,以便发送到刚刚结束的主题窗口。

也许有一些更好的方法来实现这种聚合?

【问题讨论】:

    标签: apache-kafka aggregate apache-kafka-streams


    【解决方案1】:

    如果对系统有更多了解,就很难诊断。

    您的集群中有多少个分区? 您正在运行多少个流应用程序? 流应用程序是否在同一台机器上运行? 您是否对有效负载使用压缩? 它适用于较小的间隔吗?

    希望对您有所帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-02-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多