【发布时间】:2026-01-16 02:40:01
【问题描述】:
我有一个简单的Spring Batch Kafka Consumer Job,它从 Kafka 主题读取数据并将数据写入文件。
我想生成 5 个实例 我的 Kafka 消费者作业,以便作业可以更快地完成。也就是说,我启动了我的程序 5 次,以便 5 个消费者作业在他们自己的 JVM 进程中启动。
这种方法的直接问题是 5 个进程将写入同一个文件。我通过在文件名中附加一个唯一的进程 ID 来解决这个问题。我更新的 writer bean 如下:
private static final String UNIQUE_PROCESS_IDENTIFIER = System.currentTimeMillis();
@Bean
public FlatFileItemWriter<String> testFileWriter() {
FlatFileItemWriter<String> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource(
"I:/CK/data/output_from_consumer_"+UNIQUE_PROCESS_IDENTIFIER+".dat"));
writer.setAppendAllowed(false);
writer.setShouldDeleteIfExists(true);
DelimitedLineAggregator<String> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
writer.setLineAggregator(lineAggregator);
return writer;
}
通过将时间戳附加到输出文件名,我确保每个消费者 JVM 进程都写入自己的文件。
当我最终启动同一程序的 5 个实例(JVM 进程)时,我的期望是,如果在它自己的 JVM 进程中运行的一个消费者作业从一个分区中读取消息,那么在它们自己的 JVM 进程中运行的其他消费者作业不会从同一个分区再次读取相同的消息(因为所有 5 个 Java 进程都将使用同一个使用者组,即 mygroup)
但是,我可以看到每个消费者作业进程 (JVM) 最终都会读取所有消息。结果,我现在有 5 个文件,每个文件都包含相同的内容。示例输出文件名以及每个文件中的记录数,以便更好地解释:
output_from_consumer_1600530320385.dat -> 1 million records
output_from_consumer_1600530335555.dat -> 1 million reocrds
output_from_consumer_1900530335555.dat -> 1 million records
output_from_consumer_1900530335556.dat -> 1 million records
output_from_consumer_1900730334556.dat -> 1 million records
Total records: 5 million
问题:如何配置我的 Spring Batch 作业,以便即使使用使用者作业启动多个 Java 进程,Java 进程也只读取同一组中的使用者尚未读取的数据作为单独的 Java 进程启动?
这是我的预期输出(只是代表性的):
output_from_consumer_1600530320385.dat -> 100,000 records
output_from_consumer_1600530335555.dat -> 200,000 records
output_from_consumer_1900530335555.dat -> 200,000 records
output_from_consumer_1900530335556.dat -> 400,000 records
output_from_consumer_1900730334556.dat -> 100,000 records
Total records : 1 million
【问题讨论】:
-
Kafka 主题包含多少个分区,您尝试并行消费这些分区?
-
@ShreyJakhmola 如链接问题中所述,我的主题有 4 个分区。每个 Java 批处理都在使用来自所有分区的数据。
标签: java apache-kafka spring-batch consumer