【问题标题】:Achieve concurrency in Kafka consumers在 Kafka 消费者中实现并发
【发布时间】:2019-06-19 14:34:52
【问题描述】:

我们正在努力使我们的 Kafka 消费者并行化,以处理更多的记录来处理峰值负载。我们已经在做的一种方法是在同一个消费者组中启动尽可能多的消费者。

我们的消费者处理目前为止同步的 API 调用。我们觉得让这个 API 调用异步会使我们的消费者处理更多的负载。因此,我们正在尝试使 API 调用异步,并在其响应中增加偏移量。但是,我们看到了一个问题:

通过使 API 调用异步,我们可能会首先获得最后一条记录的响应,并且到那时之前记录的 API 调用都没有启动或完成。如果我们在收到最后一条记录的响应后立即提交偏移量,则偏移量将更改为最后一条记录。与此同时,如果消费者重新启动或分区重新平衡,我们将不会收到在我们提交偏移量的最后一条记录之前的任何记录。这样一来,我们就会错过未处理的记录。

到目前为止,我们已经有 25 个分区。我们期待了解是否有人在不增加分区的情况下实现了并行化,或者增加分区是实现并行化的唯一方法(以避免偏移问题)。

【问题讨论】:

  • 您没有提供足够的信息,您提供了这样一个事实,即记录似乎依赖于首先处理的先前记录,您需要解释数据结构以及为什么它依赖于先前记录,从松散的声音来看,您需要一起处理这些记录,或者建立一个同步该过程的机制。目前问题是太广泛了

标签: parallel-processing apache-kafka kafka-consumer-api consumer


【解决方案1】:

首先,您需要将消息的读取与这些消息的处理分离(如果只是一开始)。接下来看看你可以对你的 API 进行多少并发调用,因为调用它的频率超过服务器可以处理的频率没有任何意义,无论是否异步。如果并发 API 调用的数量大致等于您在主题中的分区数量,那么异步调用 API 是没有意义的。

如果分区的数量明显少于可能的并发 API 调用的最大数量,那么您有几个选择。您可以按照您的建议通过异步调用 API 来尝试使用更少的线程(每个使用者一个)进行最大数量的并发 API 调用,或者您可以创建更多线程并同步进行调用。当然,你会遇到这样一个问题:你的消费者如何将他们的工作交给更多的共享线程,但这正是 Flink 或 Storm 等流执行平台为你做的事情。提供检查点处理的流平台(如 Flink)也可以处理您在消息处理乱序时如何处理偏移提交的问题。您可以滚动您自己的检查点处理并滚动您自己的共享线程管理,但您必须非常希望避免使用流式执行平台。

最后,您的消费者可能比最大可能的并发 API 调用多,但我建议您只拥有更少的消费者和共享分区,而不是 API 调用线程。

当然,您可以随时更改主题分区的数量,以使上述首选选项更可行。

无论哪种方式,要回答您的具体问题,您需要了解 Flink 如何使用 Kafka 偏移提交进行检查点处理。为了过度简化(因为我认为您不想自己滚动),kafka 消费者不仅必须记住他们刚刚提交的偏移量,而且还必须保留之前提交的偏移量,这定义了一个消息块流过您的应用程序。要么整个​​消息块被一直处理,要么您需要将每个线程的处理状态回滚到处理前一个块中的最后一条消息的点。同样,这是一个主要的过度简化,但这就是它的完成方式。

【讨论】:

    【解决方案2】:

    你得看kafkabatch处理。简而言之:您可以使用少量(甚至单个)partitions 来设置巨大的batch.size。就目前而言,messages 的整个 batch 消耗在 consumer 端(即在 RAM 内存中) - 您可以以任何您想要的方式并行化这些消息。

    我真的很想分享链接,但他们的数量超出了网络漏洞。

    更新

    在提交偏移量方面 - 您可以对整个 batch 执行此操作。 一般来说,kafka并不是通过滥用分区数来达到目标​​性能要求,而是依靠batch处理。

    我已经看到很多项目都受到分区扩展的影响(您可能会在稍后看到问题,例如在重新平衡期间)。经验法则 - 首先查看每个可用的 batch 设置。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-08-03
      • 1970-01-01
      • 2016-06-21
      • 2020-09-04
      • 2022-10-26
      • 2021-03-06
      • 1970-01-01
      相关资源
      最近更新 更多