【发布时间】:2018-04-26 01:09:31
【问题描述】:
我有一个Kafka topic 和1-partition。 1 listener 在我的 spring-boot 应用程序中使用 @KafkaListener 定义。 listener 使用 ThreadPoolTaskExecutor 选择 ConsumerRecord 并处理它。但是,在这种情况下,我可以看到 kafka 承诺的严格排序不成立,因为我可以看到offsets 在并行线程开始处理时有时会跳跃(使用时间戳验证)......所以问题:
- 为什么内部并行线程的排序不遵循 听众?
- 如何同时实现并行和排序,所以 并行线程拾取下一个偏移量而不是跳转?
编辑 1
public class DefaultTopicListener {
@Autowired
ThreadPoolTaskExecutor executorPool;
@KafkaListener(topicPartitions=@TopicPartition(topic="defaultTopic",
partitions={"0"}))
public void onMessage(ConsumerRecord<String, CustomPayload> request) {
CustomPayload message = request.value();
try {
executorPool.execute(new Runnable() {
@Override
public void run() {
logger.info(
"onMessage : executorPool_THREAD_{}-> -> Offset {}.... ",
Thread.currentThread().getId(), request.offset());
}
});
} catch (RejectedExecutionException ex) {
logger.error(
"onMessage : executorPool -> Queue Full Request Rejected for offset -> {}", ex, );
}
}
public class Config {
@Bean("executorPool")
public ThreadPoolTaskExecutor executorPool(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(5);
return executor;
}
}
请多多指教。
【问题讨论】: