【问题标题】:How does kafka decide which consumer reads a message within a single consumer group?kafka 如何决定单个消费者组中哪个消费者读取消息?
【发布时间】:2021-01-09 09:12:10
【问题描述】:

我想知道是否有任何逻辑可以确定哪个消费者在同一消费者组中读取消息。我有一个主题和一个消费者组。但是,我有一个或多个消费者,因为在生产环境中部署了一个消费者,当我在本地运行我的应用程序时,会创建另一个订阅相同主题的消费者(这是一个测试项目,所以它不是真正的生产,我不担心数据丢失)。我注意到有趣的是,本地消费者总是消费任何给定的消息。所以看起来后面创建的消费者优先。

是否可以配置 kafka 以使之前创建的消费者优先读取?

我的设置包括 3 个代理和 1 个消费者组 ID。此外,此 property auto.offset.reset 设置为 earliest(将其更改为 latest 不能解决问题)。我将这个 Go library 用于卡夫卡。这是我的设置代码:

import (
    "log"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func getConfig() *kafka.ConfigMap {
    return &kafka.ConfigMap{
        "metadata.broker.list": conf.KafkaBrokers,
        "security.protocol":    "SASL_SSL",
        "sasl.mechanisms":      "SCRAM-SHA-256",
        "sasl.username":        conf.KafkaUsername,
        "sasl.password":        conf.KafkaPassword,
        "group.id":             conf.KafkaGroupID,
        "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
        //"debug":                           "generic,broker,security",
    }
}

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    在一个消费者组中,每个分区都由一个消费者消费。当消费者加入组时,其中一个计算分配,该分配由每个消费者将处理的分区列表组成。

    在您的客户端中,可以通过partition.assignment.strategy 进行配置。这默认为 range,它遵循 Apache Kafka 的 RangeAssignor 的实现。

    引用 Javadoc:

    范围分配器在每个主题的基础上工作。对于每个主题,我们按数字顺序排列可用分区,按字典顺序排列消费者。然后,我们将分区数除以消费者总数,以确定分配给每个消费者的分区数。如果不均分,则前几个消费者会多出一个分区。

    例如,假设有两个消费者 C0 和 C1,两个主题 t0 ​​和 t1,每个主题有 3 个分区,产生分区 t0p0、t0p1、t0p2、t1p0、t1p1 和 t1p2。

    作业将是:

    C0: [t0p0, t0p1, t1p0, t1p1]
    C1: [t0p2, t1p2]
    

    消费者按其成员 ID 排序,该 ID 是在代理端生成的。它基于消费者 client.id 和一个随机 UUID。

    实际上,每个分区分配给哪个消费者并不重要,因此我不会过多关注该部分。相反,重要的是要了解如何分配分区并确定最适合您的用例的策略。

    为了完整起见,confluent-kafka-go 还支持其他策略,例如:roundrobincooperative-sticky

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-10-23
      • 2017-11-09
      • 1970-01-01
      • 2017-09-23
      • 1970-01-01
      • 1970-01-01
      • 2020-09-19
      • 1970-01-01
      相关资源
      最近更新 更多