【问题标题】:Apache Kafka Producer Broker ConnectionApache Kafka 生产者代理连接
【发布时间】:2016-12-18 01:09:34
【问题描述】:

我有一组作为集群运行的 Kafka 代理实例。我有一个正在向 Kafka 生成数据的客户端:

props.put("metadata.broker.list", "broker1:9092,broker2:9092,broker3:9092");

当我们使用 tcpdump 进行监控时,我可以看到只有与 broker1 和 broker2 的连接已建立,而对于 broker3,我的生产者没有连接。我有一个只有一个分区的主题。

我的问题:

  1. 代理数量和主题分区之间的关系如何?我是否应该始终拥有代理数 = 分区数?

  2. 为什么在我的情况下,我无法连接到 broker3?或者至少我的网络监控没有显示我的 Producer 与 broker3 建立了连接?

如果我能从生产者的角度更深入地了解与代理的连接是如何工作的,那就太好了。

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    显然,您的生产者不需要连接到broker3 :)

    我将尝试向您解释当您向 Kafka 生成数据时会发生什么:

    1. 您启动了一些代理,比如说 3,然后创建一些主题 foo,其中包含 2 个分区,复制因子为 2。非常简单的示例,但对于某人来说可能是一个真实的案例。
    2. 您创建一个生产者,并为这些代理配置了metadata.broker.list(或新生产者中的bootstrap.servers)。值得一提的是,您不必指定集群中的所有代理,实际上您只需指定其中的 1 个,它仍然可以工作。我也会稍微解释一下。
    3. 您使用您的生产者向主题foo 发送消息。
    4. 生产者查找其本地元数据缓存以查看主题foo 的每个分区的领导者以及您的foo 主题有多少个分区。由于这是第一次发送给生产者,本地缓存不包含任何内容。
    5. 生产者按顺序向metadata.broker.list 中的每个代理发送TopicMetadataRequest,直到第一次成功响应。这就是为什么我提到该列表中的 1 个经纪人只要它还活着就可以工作。
    6. 返回的TopicMetadataResponse 将包含有关请求主题的信息,在您的情况下是foo 和集群中的代理。基本上,此响应包含以下内容:
      • 集群中的代理列表,其中每个代理都有一个ID、主机和端口。此列表可能不包含集群中的完整代理列表,但至少应包含负责为主题主题提供服务的代理列表。
      • 主题元数据列表,其中每个条目都有主题名称、分区数、每个分区的领导代理 ID 和每个分区的 ISR 代理 ID。
    7. 基于TopicMetadataResponse,您的生产者构建其本地缓存,现在确切地知道主题foo 分区0 的请求应该发送到代理X。
    8. 根据主题中的分区数,生产者对您的消息进行分区并累积它,并且知道它应该作为批处理的一部分发送给某个代理。
    9. 当批次已满或linger.ms 超时通过时,您的生产者将批次刷新到代理。 “刷新”是指“打开与代理的新连接或重用现有连接,并发送ProduceRequest”。

    生产者不需要打开与所有代理的不必要的连接,因为某些代理可能无法为您生成的主题提供服务,并且您的集群可能非常大。想象一个有很多主题的 1000 个代理集群,但其中一个主题只有一个分区 - 您只需要一个连接,而不是 1000 个。

    在您的特定情况下,如果您只有一个分区,我不能 100% 确定为什么您有 2 个与代理的打开连接,但我假设一个连接在元数据发现期间打开并被缓存以供重用,而第二个连接一种是产生数据的实际代理连接。但是,在这种情况下,我可能错了。

    但无论如何,根本不需要为第三个代理建立连接。

    关于您关于“我应该始终拥有经纪人数量 = 分区数量”的问题吗?答案很可能是否定的。如果您解释您要达到的目标,也许我可以为您指出正确的方向,但这太宽泛而无法解释。我建议阅读this 以澄清事情。

    UPD 回答评论中的问题:

    元数据缓存更新分两种情况:

    1. 如果生产者因任何原因无法与代理通信 - 这包括代理根本无法访问以及代理响应错误(例如“我不再是此分区的领导者,请继续离开”)

    2. 如果没有发生故障,客户端仍会每隔metadata.max.age.ms (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java#L42-L43) 刷新元数据以发现新的代理和分区本身。

    【讨论】:

    • 解释得很好!非常感谢!
    • 一个问题,Producer 缓存使用来自代理集的元数据更新的频率如何?
    • 太棒了!再次感谢您的澄清!
    猜你喜欢
    • 1970-01-01
    • 2017-12-18
    • 1970-01-01
    • 2018-06-16
    • 2016-06-11
    • 2019-11-15
    • 1970-01-01
    • 1970-01-01
    • 2017-11-03
    相关资源
    最近更新 更多