【问题标题】:Consuming Kafka using ThreadPool doesn't ensure Ordering?使用 ThreadPool 消费 Kafka 并不能确保 Ordering?
【发布时间】:2018-04-26 01:09:31
【问题描述】:

我有一个Kafka topic1-partition1 listener 在我的 spring-boot 应用程序中使用 @KafkaListener 定义。 listener 使用 ThreadPoolTaskExecutor 选择 ConsumerRecord 并处理它。但是,在这种情况下,我可以看到 kafka 承诺的严格排序不成立,因为我可以看到offsets 在并行线程开始处理时有时会跳跃(使用时间戳验证)......所以问题:

  1. 为什么内部并行线程的排序不遵循 听众?
  2. 如何同时实现并行和排序,所以 并行线程拾取下一个偏移量而不是跳转?

编辑 1

public class DefaultTopicListener {
    @Autowired
    ThreadPoolTaskExecutor executorPool;

    @KafkaListener(topicPartitions=@TopicPartition(topic="defaultTopic", 
partitions={"0"}))
    public void onMessage(ConsumerRecord<String, CustomPayload> request) {
        CustomPayload message = request.value();
        try {
            executorPool.execute(new Runnable() {
                @Override
                public void run() {
                    logger.info(
                            "onMessage : executorPool_THREAD_{}-> -> Offset {}.... ",
                            Thread.currentThread().getId(), request.offset());
                }
            });
        }  catch (RejectedExecutionException ex) {
            logger.error(
                    "onMessage : executorPool -> Queue Full Request Rejected for offset -> {}", ex, );
        }
    }
public class Config {
    @Bean("executorPool")
    public ThreadPoolTaskExecutor executorPool(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(5);

        return executor;
    }
}

请多多指教。

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    Kafka 通常建议每个消费者使用一个线程。如果您想在这种情况下将处理与消耗分离,则将 ConsumerRecords 实例移交给由实际处理记录处理的处理器线程池消耗的阻塞队列。

    https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

    但是在这种情况下不能保证顺序,因为线程将独立执行较早的数据块实际上可能只是由于线程执行时间的运气而在较晚的数据块之后处理。

    可以通过多个分区和单个线程负责分区来实现排序和并行,分区中的所有记录将由线程按顺序处理。

    【讨论】:

      【解决方案2】:

      不清楚你的意思。线程池不“挑选”东西,它们被赋予运行任务。您需要出示您的代码。

      推测...

      如果您的侦听器将 ConsumerRecord 传递给线程池,那么记录顺序当然会丢失,因为记录是在不同的线程上处理的(除非池的大小为 1)。

      对于单个分区,侦听器容器在单个线程上调用侦听器。如果你想保持秩序,你不能把工作交给其他线程。

      实现并发的唯一方法是使用多个分区,增加容器上的并发。分区将分布在容器线程中。

      或者,您需要管理代码中的确认以确保没有提交“跳转”。

      仅在一个分区内保证排序,因此,同样,您不能将其移交给另一个线程。

      【讨论】:

      • 监听器不应该按顺序切换到任何线程吗?侦听器是否绑定到单个消费者,该消费者能够维护订单并按顺序进行任何处理而不会破坏偏移排序?由于我维护一个 1 Partition/1 Listener,我的期望是阅读 Topic has to be ordered.. 我错过了什么?
      • 侦听器线程轮询消费者并一次调用一个侦听器,所以,是的,它维持秩序,但如果你将它交给另一个线程,秩序就会丢失 - 再次 - 你需要显示您的代码,以便我可以更好地理解您在做什么。编辑问题以显示您的代码。
      • 更新了测试用例。
      • 对 - 这就是我怀疑的 - 你根本不能这样做,这取决于容器配置(AckMode),一旦消息传递给执行者,偏移量就会被提交(AckMode.RECORD),或者当批处理中的最后一条消息被传递时 (AckMode.BATCH)。如果服务器崩溃,您将丢失消息(在 TaskExecutor 中排队)。您可以使用 MANUAL ack 模式处理它,但您必须跟踪偏移量,这样您就不会提交尚未处理的偏移量。一般来说,使用多个分区会更好/更容易。
      • 我尝试了手动确认,在将其交给 ThreadPoolExecutor 之后,因此如果线程池抛出异常,则不应调用确认。但这并没有阻止消息被传递给侦听器,即使抛出了异常。因此,我采用了@asolanki 建议的设计指针并做了一些事情:1)在消费者端启用 BATCH 处理 2)在没有活动线程时限制任何消息传递到池。通过这 2 个设置,问题在很大程度上得到了控制。消息显示为有序。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2023-01-31
      • 1970-01-01
      • 1970-01-01
      • 2018-06-04
      • 1970-01-01
      • 2019-05-01
      • 1970-01-01
      相关资源
      最近更新 更多