【问题标题】:Spring Batch Kafka Consumer Job : Honor message groups across multiple JVM processesSpring Batch Kafka Consumer Job:跨多个 JVM 进程尊重消息组
【发布时间】:2026-01-16 02:40:01
【问题描述】:

我有一个简单的Spring Batch Kafka Consumer Job,它从 Kafka 主题读取数据并将数据写入文件。

我想生成 5 个实例 我的 Kafka 消费者作业,以便作业可以更快地完成。也就是说,我启动了我的程序 5 次,以便 5 个消费者作业在他们自己的 JVM 进程中启动

这种方法的直接问题是 5 个进程将写入同一个文件。我通过在文件名中附加一个唯一的进程 ID 来解决这个问题。我更新的 writer bean 如下:

private static final String UNIQUE_PROCESS_IDENTIFIER = System.currentTimeMillis();    

@Bean
public FlatFileItemWriter<String> testFileWriter() {
    FlatFileItemWriter<String> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource(
            "I:/CK/data/output_from_consumer_"+UNIQUE_PROCESS_IDENTIFIER+".dat"));
    writer.setAppendAllowed(false);
    writer.setShouldDeleteIfExists(true);
    DelimitedLineAggregator<String> lineAggregator = new DelimitedLineAggregator<>();
    lineAggregator.setDelimiter(",");
    writer.setLineAggregator(lineAggregator);
    return writer;
}

通过将时间戳附加到输出文件名,我确保每个消费者 JVM 进程都写入自己的文件。

当我最终启动同一程序的 5 个实例(JVM 进程)时,我的期望是,如果在它自己的 JVM 进程中运行的一个消费者作业从一个分区中读取消息,那么在它们自己的 JVM 进程中运行的其他消费者作业不会从同一个分区再次读取相同的消息(因为所有 5 个 Java 进程都将使用同一个使用者组,即 mygroup

但是,我可以看到每个消费者作业进程 (JVM) 最终都会读取所有消息。结果,我现在有 5 个文件,每个文件都包含相同的内容。示例输出文件名以及每个文件中的记录数,以便更好地解释:

output_from_consumer_1600530320385.dat -> 1 million records
output_from_consumer_1600530335555.dat -> 1 million reocrds
output_from_consumer_1900530335555.dat -> 1 million records
output_from_consumer_1900530335556.dat -> 1 million records
output_from_consumer_1900730334556.dat -> 1 million records

Total records: 5 million

问题:如何配置我的 Spring Batch 作业,以便即使使用使用者作业启动多个 Java 进程,Java 进程也只读取同一组中的使用者尚未读取的数据作为单独的 Java 进程启动?

这是我的预期输出(只是代表性的):

output_from_consumer_1600530320385.dat -> 100,000 records
output_from_consumer_1600530335555.dat -> 200,000 records
output_from_consumer_1900530335555.dat -> 200,000 records
output_from_consumer_1900530335556.dat -> 400,000 records
output_from_consumer_1900730334556.dat -> 100,000 records 

Total records : 1 million

【问题讨论】:

  • Kafka 主题包含多少个分区,您尝试并行消费这些分区?
  • @ShreyJakhmola 如链接问题中所述,我的主题有 4 个分区。每个 Java 批处理都在使用来自所有分区的数据。

标签: java apache-kafka spring-batch consumer


【解决方案1】:

在同一组 ID 中运行具有相同消费者 ID 的 Kafka 消费者的多个实例不会帮助您实现并行性。

Kafka 消费者的并行性可以通过使用多个消费者来实现,每个消费者具有不同的消费者 ID 和相同的消费者组 ID。消费者组是多个消费者在一个组下的分组机制。数据在组的所有消费者之间平均分配,组中没有两个消费者接收相同的数据。

在将分区分配给消费者之前,Kafka 将首先检查是否有任何具有给定 group-id 的现有消费者。 当没有具有给定 group-id 的现有消费者时,它将将该主题的所有分区分配给这个新消费者。 当已经有两个消费者使用给定的 group-id 并且第三个消费者想要使用相同的 group-id 消费时。它将在所有三个消费者之间平均分配分区。不会将同一 group-id 的两个使用者分配到同一分区。

示例 假设有一个主题有 4 个分区和两个消费者,consumer-Aconsumer-B 想要使用 group-id my-consumer-group 从中消费,那么 Kafka 将为每个消费者分配相等数量的分区,即 2 到 @987654325 @ 和2 to the consumer-B

对于您的用例,由于 Kafka 主题包含 4 个分区,因此您可以使用 4 个消费者,每个消费者具有不同的消费者 ID 并且具有相同的组 ID。

【讨论】:

  • 我已经了解消费者群体的运作方式。在我的问题中,我清楚地解释了我将启动我的程序 5 次。这意味着我的程序的每个实例都将尝试从所有分区中读取(如您所见,Spring Batch 要求为 KafkaItemReader 提供要读取的分区)。如何确保当消费者进程 1 从分区 1 读取消息 1 时,消费者进程 2 不会读取相同的消息。 Spring batch KafkaItemReader 设计为始终从分区的开头读取。请阅读有关 KafkaItemReader 的信息。。
  • 为什么要运行同一个应用程序的 5 个实例?如果您的答案是快速并行消费,那么方法是错误的,这不是 Kafka 实现并行性或快速消费的方式。
  • 此外,在 Spring-batch 4.3 10 月版本中,offset order 属性将可用于 kafkaItemReader,因为它不是一个功能,而是如 here 提到的一个问题。
【解决方案2】:

创建KafkaItemReader时,可以指定要从哪个分区读取:

KafkaItemReader reader = new KafkaItemReader(myConsumerProperties, "topic1", 0)

上述阅读器将从topic1中的分区0读取消息。因此,在您的情况下,您可以并行运行作业并将每个作业配置为从不同主题读取消息(例如,将主题/分区作为作业参数传递)。

【讨论】:

  • 这意味着并行度受到物理主题分区数量的限制。我正在寻找的是一种产生比分区数量更多的消费者的方法。因此,例如,如果我有 4 个分区,我想启动 8 个消费者,这样如果一个消费者阅读一条消息,同一组中的其他消费者将不会阅读相同的消息。有没有办法在 Spring Batch with Kafka 中实现这一点?