【问题标题】:How to increase the topic consumer throughput by using `Function<Flux<String>, Flux<String>>`?如何使用 `Function<Flux<String>, Flux<String>>` 增加主题消费者的吞吐量?
【发布时间】:2020-07-07 12:42:39
【问题描述】:

我有一个基于 webFlux 的服务,将消费然后产生来自 kafka 主题的消息。 我的代码是这样的

@Bean
public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
    return flux -> flux.map(val -> val.toUpperCase());
}

我发现当我有 2 个实例时,我每 30 分钟可以消耗 750 条消息,但我的 CPU 从未高于 10%。 随着时间的推移,滞后不断增加,所以我想知道如何提高消费者的吞吐量。 从文档看,reactive 的并发不生效,link

有谁知道如何在不添加更多实例的情况下提高吞吐量?

【问题讨论】:

    标签: spring-webflux spring-cloud-stream reactor spring-cloud-stream-binder-kafka


    【解决方案1】:

    当我使用 kotlin 时,我发现使用的是 Flow.flatMapMerge(parallelCount)

    【讨论】:

      猜你喜欢
      • 2020-10-13
      • 2018-08-31
      • 2012-07-05
      • 2015-04-09
      • 2013-04-16
      • 2019-06-14
      • 2017-06-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多