【问题标题】:Is key required as part of sending messages to Kafka?向 Kafka 发送消息时是否需要密钥?
【发布时间】:2015-06-13 05:26:05
【问题描述】:
KeyedMessage<String, byte[]> keyedMessage = new KeyedMessage<String, byte[]>(request.getRequestTopicName(), SerializationUtils.serialize(message)); 
producer.send(keyedMessage);

目前,我正在发送没有任何密钥的消息作为密钥消息的一部分,它仍然可以与delete.retention.ms 一起使用吗?我需要在消息中发送密钥吗?将密钥作为消息的一部分这样好吗?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    如果您需要强大的密钥顺序并且正在开发诸如状态机之类的东西,则密钥通常是有用/必要的。如果您要求始终以正确的顺序查看具有相同键(例如,唯一 id)的消息,则将键附加到消息将确保具有相同键的消息始终进入主题中的同一分区。 Kafka 保证分区内的顺序,但不保证主题中的分区之间的顺序,因此,或者不提供密钥(这将导致跨分区的循环分配)将不会保持这种顺序。

    在状态机的情况下,密钥可以与 log.cleaner.enable 一起使用,以对具有相同密钥的条目进行重复数据删除。在这种情况下,Kafka 假设您的应用程序只关心给定键的最新实例,并且日志清理器仅在该键不为空时才删除给定键的旧副本。这种形式的日志压缩由 log.cleaner.delete.retention 属性控制,并且需要密钥。

    或者,默认启用的更常见的属性 log.retention.hours 通过删除已过期的完整日志段来工作。在这种情况下,不必提供密钥。 Kafka 将简单地删除超过给定保留期的日志块。

    就是这么说,如果您启用了log compaction 或要求对具有相同密钥的消息进行严格排序,那么您绝对应该使用密钥。否则,在某些键可能比其他键出现得更多的情况下,空键可能会提供更好的分布并防止潜在的热点问题。

    【讨论】:

    • 我是 Kafka 新手,这就是问这么多问题的原因:有几个问题:第一个问题,我们可以在关键基础上使用消息吗,目前我正在使用来自 MessagAndMetadata mm 的消息.或者在消费消息时忽略密钥是否可以。我正在使用高级消费者 API。
    • @kuujo 我假设这种重复数据删除仅适用于日志条目,它不一定对主题队列上的消息进行重复数据删除?
    • @oblivion 让消息按顺序进入同一分区对于处理非幂等更新非常重要,例如客户选择交货日期(一条消息)但稍后改变主意(第二条消息)。如果消息要发送到不同的分区,则可以首先/最后处理任一消息,例如每个分区有 2 个消费者消费。如果与同一 Delivery 相关的两条消息都进入同一个分区,则它们将被先进先出处理,并给出正确的最终交付日期。
    • 顺序保证不是来自密钥,而是来自同一分区中的消息。消息到分区的路由不必是基于键的。创建ProducerRecord时可以显式指定分区@
    • 我的理解是生产者客户端负责选择分区(kafka.apache.org/documentation.html#design_loadbalancing),可能基于也可能不基于key。那你为什么说订购需要钥匙呢?
    【解决方案2】:

    tl;博士 不,在向 Kafka 发送消息时不需要密钥。但是……


    除了非常有用的公认答案之外,我还想补充一些细节

    分区

    默认情况下,Kafka 使用消息的键来选择它写入的主题的分区。这是在DefaultPartitioner by

    中完成的
    kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    

    如果没有提供密钥,则 Kafka 将以循环方式对数据进行分区。

    在 Kafka 中,可以通过扩展 Partitioner 类来创建自己的 Partitioner。为此,您需要覆盖具有签名的partition 方法:

    int partition(String topic, 
                  Object key,
                  byte[] keyBytes,
                  Object value,
                  byte[] valueBytes,
                  Cluster cluster)
    

    通常,Kafka消息的key用于选择分区,返回值(int类型)为分区号。如果没有密钥,您需要依赖可能更复杂处理的值。

    订购

    如给定答案中所述,Kafka 仅在分区级别保证消息的排序。

    假设您想将客户的金融交易存储在具有两个分区的 Kafka 主题中。消息可能看起来像(键:值)

    null:{"customerId": 1, "changeInBankAccount": +200}
    null:{"customerId": 2, "changeInBankAccount": +100}
    null:{"customerId": 1, "changeInBankAccount": +200}
    null:{"customerId": 1, "changeInBankAccount": -1337}
    null:{"customerId": 1, "changeInBankAccount": +200}
    

    由于我们没有定义键,所以两个分区可能看起来像

    // partition 0
    null:{"customerId": 1, "changeInBankAccount": +200}
    null:{"customerId": 1, "changeInBankAccount": +200}
    null:{"customerId": 1, "changeInBankAccount": +200}
    
    // partition 1
    null:{"customerId": 2, "changeInBankAccount": +100}
    null:{"customerId": 1, "changeInBankAccount": -1337}
    

    您阅读该主题的消费者最终可能会告诉您该帐户的余额在特定时间为 600,尽管情况并非如此!只是因为它在分区 1 中的消息之前读取了分区 0 中的所有消息。

    使用有意义的键(如 customerId)可以避免这种情况,因为分区是这样的:

    // partition 0
    1:{"customerId": 1, "changeInBankAccount": +200}
    1:{"customerId": 1, "changeInBankAccount": +200}
    1:{"customerId": 1, "changeInBankAccount": -1337}
    1:{"customerId": 1, "changeInBankAccount": +200}
    
    // partition 1
    2:{"customerId": 2, "changeInBankAccount": +100}
    

    请记住,只有将生产者配置 max.in.flight.requests.per.connection 设置为 1 才能保证分区内的排序。但是,该配置的默认值是 5,它被描述为:

    "客户端在阻塞前将在单个连接上发送的未确认请求的最大数量。注意,如果此设置设置为大于 1 并且有发送失败的情况,则存在重新发送消息的风险由于重试而排序(即,如果启用重试)。"

    您可以在 Kafka - Message Ordering Guarantees 上的另一篇 Stackoverflow 帖子中找到更多详细信息。

    日志压缩

    如果没有密钥作为消息的一部分,您将无法将主题配置 cleanup.policy 设置为 compacted。根据documentation,“日志压缩确保 Kafka 将始终为单个主题分区的数据日志中的每个消息键至少保留最后一个已知值。”。

    如果没有任何密钥,这个漂亮而有用的设置将无法使用。

    密钥的使用

    在实际用例中,Kafka 消息的密钥会对您的性能和业务逻辑的清晰度产生巨大影响。

    例如,密钥可以自然地用于对数据进行分区。由于您可以控制您的消费者从特定分区中读取,这可以作为一个有效的过滤器。此外,密钥可以包含一些关于消息实际值的元数据,以帮助您控制后续处理。键通常比值小,因此解析键而不是整个值更方便。同时,您也可以使用密钥应用所有序列化和模式注册。

    作为说明,还有Header的概念,可以用来存储信息,见documentation

    【讨论】:

    • 如果生产者尝试将消息写入 100 个主题分区(例如:数字键范围从 0 到 99 的消息),而代理只有 10 个主题分区,会发生什么情况?消息是否会通过循环使用默认机制分发?
    • @dandev486 不确定我是否正确理解了您的问题。如果您尝试将 PROducerRecord 写入不存在的 TopicPartition,生产者将抛出异常。如果您使用数字键 0 到 99,则消息会根据我的回答中所述的hash(key) % 10 分布在 10 个分区中。
    • @Mike,一个后续问题。我了解向密钥保存者提供消息的顺序,在“所有”情况下都是真的吗?比如说,生产者发送了 (k1,m1,t) 和 (k1,m2,t) 。是否可以保证 m1 始终获得比 m2 低的偏移量(意味着与 m1 相比,m2 将被视为最新消息)
    • :-),这意味着即使我们发送密钥也不能保证始终?
    • @Nag 是的,这是正确的。我会在我的回答中说明这一点。感谢您指出这一点
    【解决方案3】:

    带有消息的键基本上是为了获取特定字段的消息排序而发送的。

    • 如果 key=null,数据将循环发送(发送到不同的分区和分布式环境中的不同代理。当然也发送到同一个主题。
    • 如果发送了一个密钥,那么该密钥的所有消息将始终发送到同一个分区。

    解释和举例

    • key可以是任意字符串或整数等。以整数employee_id为例。
    • 因此 emplyee_id 123 将始终进入分区 0,emplyee_id 345 将始终进入分区 1。这是由取决于分区数量的密钥哈希算法决定的。
    • 如果您不发送任何密钥,则消息可以使用循环技术发送到任何分区。

    【讨论】:

    • 将消息发送到相同的分区并不能保证始终排序?
    • 保证同一分区内的顺序。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-16
    • 2021-03-16
    • 1970-01-01
    • 2018-01-10
    • 1970-01-01
    • 2012-07-29
    相关资源
    最近更新 更多