【问题标题】:Reactor Kafka: message consumption always on one thread no matter the number of CPU from machineReactor Kafka:无论机器的 CPU 数量如何,消息消费始终在一个线程上
【发布时间】:2023-02-11 05:07:00
【问题描述】:

请问关于 Reactor Kafka 的小问题。

我有一个非常简单的 Reactor Kafka 项目。

package com.example.micrometer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Consumer;

@SpringBootApplication
public class StreamReactiveConsumerApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(StreamReactiveConsumerApplication.class);

    public static void main(String... args) {
        new SpringApplicationBuilder(StreamReactiveConsumerApplication.class).run(args);
    }

    @Override
    public void run(String... args) {
    }

    @Bean
    Consumer<Flux<Message<String>>> consume() {
        return flux -> flux.flatMap(one -> myHandle(one) ).subscribe();
    }

    private Mono<String> myHandle(Message<String> one) {
        log.info("<==== look at this thread" + "\u001B[32m" + one.getPayload() + "\u001B[0m");
        String payload = one.getPayload();
        String decryptedPayload = complexInMemoryDecryption(payload); //this is NON blocking, takes 1 second
        String complexMatrix = convertDecryptedPayloadToGiantMatrix(decryptedPayload);  //this is NON blocking, takes 1 second
        String newMatrix = matrixComputation(complexMatrix); //this is NON blocking, takes 1 second
        return myNonBlockingReactiveRepository.save(complexMatrix);
    }

}

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>streamreactiveconsumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.2</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2022.0.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

(注意,不是Spring Kafka项目,不是Spring Cloud Stream项目)

我正在使用一个包含 3 个分区的主题。发送消息的速率是每秒一条消息。

消息的消费和处理每条消息需要 3 秒左右的时间。

重要提示:请注意该处理不包含任何阻塞操作。它是内存解密+巨型矩阵计算的巨人。它是 BlockHound 测试的非阻塞。

实际的: 当我使用 Reactor Kafka 项目使用消息时,整个使用只发生在一个线程上。一切都发生在container-0-C-1

一切都会发生在container-0-C-1,用2个CPU、4个CPU、8个CPU的硬件测试

2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :
2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :
2023-02-06 10:42:59 8384 INFO  --- [KafkaConsumerDestination{consumerDestinationName='prod_audit_hdfs', partitions=3, dlqName='null'}.container-0-C-1] [stream-reactive-consumer,,] c.e.m.StreamReactiveConsumerApplication :

预期的: 我们从基于 http webflux 迁移到基于 Kafka 消费。业务逻辑没有一点改变。

在 Reactor Netty Spring webflux 应用程序上,我们可以看到处理是从多个线程发生的,对应于反应堆核心。在具有多个内核的机器上,这很容易跟上。

[or-http-epoll-1] [or-http-epoll-2] [or-http-epoll-3] [or-http-epoll-4]

只需在这些 reactor-http-epoll-N 中的任何一个之间切换即可进行处理。 我可以看到 reactor-http-epoll-1 何时处理第一条消息的复杂内存计算,reactor-http-epoll-3 将处理第二条消息的计算,等等......并行性很明显

我知道有办法“扩展”这个应用程序,但这是 Reactor Kafka 本身的问题。

我希望这些消息可以并行处理。第一条消息使用某种 container-0-C-1,第二条消息使用 container-0-C-2,等等...

请问我怎样才能做到这一点? 我错过了什么?

谢谢

【问题讨论】:

    标签: java spring-boot apache-kafka parallel-processing reactor-kafka


    【解决方案1】:

    通常在 kafka 消费者中,将轮询周期与处理逻辑分开是个好主意。 KafkaConsumer 也有原生的 I/O 线程。有时,这种架构被称为“带流水线的消费者”。在这个架构中,轮询线程不断地从 kafka 获取记录,然后将它们“提供”到某个有界缓冲区/队列(即ArrayBlockingQueueLinkedBlockingQueue)。另一方面,处理线程从队列中获取记录并进行处理。它允许将轮询逻辑与实现缓冲和反压力的处理分离。

    Reactor Kafka 建立在KafkaConsumer API 之上,并使用类似的架构实现带有背压的反应流。 KafkaReceiver 提供轮询周期,默认情况下,在Schedulers.single 线程上发布获取的记录。

    现在,根据您的逻辑,您可以顺序或并行处理数据和提交偏移量。对于并发处理,使用 flatMap,默认情况下并行处理 256 条记录,可以使用 concurrency 参数进行控制。

    kafkaReceiver.receive()
        .flatMap(rec -> proces(rec), concurrency)
    

    如果添加日志记录,您会看到所有记录都在kafka-receiver-2 上接收,但在不同的parallel-# 线程上处理。请注意,记录是按分区顺序接收的。

    12:50:08.347  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-2, partition: 0
    12:50:08.349  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-3, partition: 0
    12:50:08.350  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-4, partition: 0
    12:50:08.350  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-6, partition: 0
    12:50:08.351  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-9, partition: 0
    12:50:08.353  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-0, partition: 2
    12:50:08.354  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-8, partition: 2
    12:50:08.355  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-1, partition: 1
    12:50:08.356  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-5, partition: 1
    12:50:08.358  [kafka-receiver-2] INFO [c.e.d.KafkaConsumerTest] - receive: value-7, partition: 1
    12:50:09.353  [parallel-3] INFO [c.e.d.KafkaConsumerTest] - process: value-2, partition: 0
    12:50:09.353  [parallel-6] INFO [c.e.d.KafkaConsumerTest] - process: value-6, partition: 0
    12:50:09.353  [parallel-4] INFO [c.e.d.KafkaConsumerTest] - process: value-3, partition: 0
    12:50:09.353  [parallel-5] INFO [c.e.d.KafkaConsumerTest] - process: value-4, partition: 0
    12:50:09.355  [parallel-7] INFO [c.e.d.KafkaConsumerTest] - process: value-9, partition: 0
    12:50:09.360  [parallel-10] INFO [c.e.d.KafkaConsumerTest] - process: value-1, partition: 1
    12:50:09.360  [parallel-9] INFO [c.e.d.KafkaConsumerTest] - process: value-8, partition: 2
    12:50:09.360  [parallel-8] INFO [c.e.d.KafkaConsumerTest] - process: value-0, partition: 2
    12:50:09.361  [parallel-11] INFO [c.e.d.KafkaConsumerTest] - process: value-5, partition: 1
    12:50:09.361  [parallel-12] INFO [c.e.d.KafkaConsumerTest] - process: value-7, partition: 1
    

    换句话说,这是设计使然,您不必担心轮询逻辑。您可以通过增加 flatMap 的并行度来扩展处理。

    【讨论】:

      猜你喜欢
      • 2023-02-02
      • 2020-12-07
      • 1970-01-01
      • 2023-01-31
      • 2022-06-15
      • 2019-03-26
      • 2019-08-27
      • 2016-12-27
      • 2014-02-13
      相关资源
      最近更新 更多