【问题标题】:scale spring-kafka consumers app horizontally水平扩展 spring-kafka 消费者应用程序
【发布时间】:2021-01-28 23:09:56
【问题描述】:

我想知道根据水平扩展实例的最大数量来配置分区数量的好方法是什么。

假设我有一个带有 6 个分区的主题。

我有一个应用程序使用ConcurrentKafkaListenerContainerFactorysetConcurrency 6。 这意味着我将有 6 KafkaMessageListenerContainer 每个使用一个线程,并从我的所有分区均匀地消费消息。

如果以上是正确的,那么我想知道如果我通过添加另一个实例水平扩展应用程序会发生什么? 如果新实例具有相同的 6 并发配置,当然还有相同的消费者组,我相信第二个实例不会消耗任何消息。因为不会发生重新平衡,因为每个现有消费者都将分配一个分区。

但是,如果我们回到第一个示例并有 6 个分区,其中一个实例的并发性为 3,那么每个消费者线程/KafkaMessageListenerContainer 将有 2分配的分区。 如果我们扩展这个应用程序(相同的消费者组 ID 和 3 个并发),我相信会发生重新平衡,并且两个实例将分别从 3 个分区消费。

这些假设是否正确,如果不正确,您应该如何处理这种情况?

【问题讨论】:

    标签: spring apache-kafka spring-kafka


    【解决方案1】:

    一般来说,您的假设对于默认行为是正确的,默认行为基于:

    /**
     * <p>The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
     * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
     * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
     * divide, then the first few consumers will have one extra partition.
     *
     * <p>For example, suppose there are two consumers <code>C0</code> and <code>C1</code>, two topics <code>t0</code> and
     * <code>t1</code>, and each topic has 3 partitions, resulting in partitions <code>t0p0</code>, <code>t0p1</code>,
     * <code>t0p2</code>, <code>t1p0</code>, <code>t1p1</code>, and <code>t1p2</code>.
     *
     * <p>The assignment will be:
     * <ul>
     * <li><code>C0: [t0p0, t0p1, t1p0, t1p1]</code></li>
     * <li><code>C1: [t0p2, t1p2]</code></li>
     * </ul>
     *
     * Since the introduction of static membership, we could leverage <code>group.instance.id</code> to make the assignment behavior more sticky.
     * For the above example, after one rolling bounce, group coordinator will attempt to assign new <code>member.id</code> towards consumers,
     * for example <code>C0</code> -&gt; <code>C3</code> <code>C1</code> -&gt; <code>C2</code>.
     *
     * <p>The assignment could be completely shuffled to:
     * <ul>
     * <li><code>C3 (was C0): [t0p2, t1p2] (before was [t0p0, t0p1, t1p0, t1p1])</code>
     * <li><code>C2 (was C1): [t0p0, t0p1, t1p0, t1p1] (before was [t0p2, t1p2])</code>
     * </ul>
     *
     * The assignment change was caused by the change of <code>member.id</code> relative order, and
     * can be avoided by setting the group.instance.id.
     * Consumers will have individual instance ids <code>I1</code>, <code>I2</code>. As long as
     * 1. Number of members remain the same across generation
     * 2. Static members' identities persist across generation
     * 3. Subscription pattern doesn't change for any member
     *
     * <p>The assignment will always be:
     * <ul>
     * <li><code>I0: [t0p0, t0p1, t1p0, t1p1]</code>
     * <li><code>I1: [t0p2, t1p2]</code>
     * </ul>
     */
    public class RangeAssignor extends AbstractPartitionAssignor {
    

    但是,您可以通过partition.assignment.strategy 消费者属性插入任何ConsumerPartitionAssignorhttps://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy

    另请参阅ConsumerPartitionAssignorJavaDocs 了解更多信息及其实现,以便为您的用例做出选择。

    【讨论】:

    • &gt;I believe the 2nd instance will not be consuming any messages - 没那么简单;发生重新平衡,分区将分布在 12 个消费者中的任意 6 个上,另外 6 个将处于空闲状态。通常,您应该过度配置分区数。假设您将其设置为 18;这将支持 1 到 3 个实例;在第一种情况下,每个消费者将获得 3 个分区。如果您从多个主题中消费并且它们具有不同的分区数,或者总并发量大于一个主题中的分区数,则会变得更加复杂。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-12-28
    • 2016-07-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-26
    • 1970-01-01
    相关资源
    最近更新 更多