【发布时间】:2023-02-02 14:56:10
【问题描述】:
请问一个关于带有 reactor kafka 的 SpringBoot 3 应用程序的小问题。
我有一个小型反应式 kafka 消费者应用程序,它使用来自 kafka 的消息并处理消息。
该应用程序正在使用一个主题the-topic,其中有三分区.
该应用程序是docker化的,出于资源消耗限制的原因,该应用程序只能使用2个CPU(请耐心等待)。为了让事情变得更困难,我只能拥有一个独特的例子这个应用程序正在运行。
该应用程序非常简单:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
</dependencies>
@Configuration
public class MyKafkaConfiguration {
@Bean
public KafkaReceiver<String, String> reactiveKafkaConsumerTemplate(KafkaProperties kafkaProperties) {
kafkaProperties.setBootstrapServers(List.of("my-kafka.com:9092"));
kafkaProperties.getConsumer().setGroupId("should-i-do-something-here");
final ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
basicReceiverOptions.subscription(Collections.singletonList("the-topic"));
return new DefaultKafkaReceiver<>(ConsumerFactory.INSTANCE, basicReceiverOptions);
}
}
@Service
public class MyConsumer implements CommandLineRunner {
@Autowired
private KafkaReceiver<String, String> kafkaReceiver;
@Override
public void run(String... args) {
myConsumer().subscribe();
}
public Flux<String> myConsumer() {
return kafkaReceiver.receive()
.flatMap(oneMessage -> consume(oneMessage))
.doOnNext(abc -> System.out.println("successfully consumed {}={}" + abc))
.doOnError(throwable -> System.out.println("something bad happened while consuming : {}" + throwable.getMessage()));
}
private Mono<String> consume(ConsumerRecord<String, String> oneMessage) {
// this first line is a heavy in memory computation which transforms the incoming message to a data to be saved.
// it is very intensive computation, but has been tested NON BLOCKING by different tools, and takes 1 second :D
String transformedStringCPUIntensiveNonButNonBLocking = transformDataNonBlockingWithIntensiveOperation(oneMessage);
//then, just saved the correct transformed data into any REACTIVE repository :)
return myReactiveRepository.save(transformedStringCPUIntensiveNonButNonBLocking);
}
}
如果我正确理解项目反应堆,并且由于我的资源限制,我将最多有 2 个反应堆核心。
这这里的consume方法已经过非阻塞测试, 但需要一秒钟来处理消息。
因此,我每秒只能消费 2 条消息吗? (希望不会)
消息可以按任何顺序使用,我希望通过这个应用程序最大化吞吐量。
请问如何在这些限制下最大化此应用程序的并行度/吞吐量?
谢谢
【问题讨论】:
-
那么,您的消费方法是 CPU 密集型的吗?如果是,则您无能为力,因为它需要全天候使用 CPU 来完成这项工作。但是,如果您发现您的 CPU 没有被完全使用,那么可能是您的消费函数以某种方式阻塞了。你能提供一些关于什么的信息吗消耗做 ?对于它是非阻塞的,这意味着它只执行内存计算。否则,如果它向数据库或网络服务发送数据,它就会阻塞。
标签: java spring-boot apache-kafka spring-kafka reactor-kafka