【问题标题】:Adding timestamp in kafka message payload在 kafka 消息有效负载中添加时间戳
【发布时间】:2013-09-16 15:29:35
【问题描述】:

有什么方法可以在 Kafka 消息有效负载中添加时间戳标头?我想检查消息何时在消费者端创建并基于此应用自定义逻辑。

编辑

我正在尝试找到一种将一些自定义值(基本上是时间戳)附加到生产者发布的消息的方法,以便我可以在特定的持续时间内使用消息。现在 Kafka 只确保消息按照放入队列的顺序传递。但在我的情况下,先前生成的记录可能会在一定延迟后到达(因此在时间 T1 生成的消息可能具有比在稍后时间 T2 生成的偏移量为 0 的消息更高的偏移量 1)。出于这个原因,它们不会按照我在消费者端的预期顺序。所以我基本上是在寻找一种出路,以有序的方式消费它们。

当前的 Kafka 0.8 版本无法在生产者端附加“消息密钥”以外的任何其他内容,找到了一个类似的主题 here,建议在消息有效负载中对​​其进行编码。但是我做了很多搜索,但找不到可能的方法。

另外我不知道这种方法是否对 Kafka 的整体性能有任何影响,因为它在内部管理消息偏移量,并且从 this 页面中可以看出目前没有公开这样的 API

如果我的想法完全正确,或者有任何可能的方法,我真的很感激任何线索,我都准备试一试

【问题讨论】:

    标签: message-queue apache-kafka


    【解决方案1】:

    如果您想在特定时间段内使用消息,那么我可以为您提供解决方案,但是从该时间段开始按顺序使用消息是很困难的。我也在寻找相同的解决方案。检查以下链接

    Message Sorting in Kafka Qqueue

    获取特定时间数据的解决方案

    对于时间 T1,T2,...TN ,其中 T 是时间范围;将主题划分为 N 个分区。现在使用 Partitioner Class 生成消息,这样消息生成时间应该用于决定该消息应该使用哪个分区。

    同样,在消费时订阅您想要消费的时间范围内的确切分区。

    【讨论】:

    • true,尝试在生产时使用消息有效负载添加一些属性,但如果您尝试传递任何期望字符串,Kafka 0.8 似乎会引发异常。找到类似的JIRA here
    • 是的,但是我建议不要在您的消息中添加属性,而是使用当前时间戳作为您的分区键来划分您的消息。请检查我在此link 中为生产者和分区类提供的答案。
    • 其实,我关心的更多的是消费时的顺序,你说得对,这会将它们划分为特定的分区。我正在考虑添加时间戳,以便我可以以某种方式使它们在生产时间排序(这样我就可以知道偏移量为 0 的事件 T0 是否在偏移量为 1 的 T1 之前发生),我想这可能是一个非常疯狂的猜测 :)
    • @user2720864 如果您找到解决上述问题的方法,请告诉我
    【解决方案2】:

    您可以创建一个包含分区信息和创建此消息时的时间戳的类,然后将其用作 Kafka 消息的键。然后,您可以使用包装器 Serde 将此类转换为字节数组并返回,因为 Kafka 只能理解字节。然后,当您在消费者端收到作为一袋字节的消息时,您可以对其进行反序列化并检索时间戳,然后将其引导到您的逻辑中。

    例如:

    public class KafkaKey implements Serializable {
        private long mTimeStampInSeconds;
        /* This contains other partitioning data that will be used by the
        appropriate partitioner in Kafka. */
        private PartitionData mPartitionData;
    
        public KafkaKey(long timeStamp, ...) {
            /* Initialize key */
            mTimeStampInSeconds = timestamp;
        }
    
        /* Simple getter for timestamp */
        public long getTimeStampInSeconds() {
            return mTimeStampInSeconds;
        }
    
        public static byte[] toBytes(KafkaKey kafkaKey) {
            /* Some serialization logic. */
        }
    
        public static byte[] toBytes(byte[] kafkaKey) throws Exception {
            /* Some deserialization logic. */
        }
    }
    
    /* Producer End */
    
    KafkaKey kafkaKey = new KafkaKey(System.getCurrentTimeMillis(), ... );
    KeyedMessage<byte[], byte[]> kafkaMessage = new KeyedMessage<>(topic, KafkaKey.toBytes(kafkaKey), KafkaValue.toBytes(kafkaValue));
    
    /* Consumer End */
    MessageAndMetadata<byte[],byte[]> receivedMessage = (get from consumer);
    KafkaKey kafkaKey = KafkaKey.fromBytes(receivedMessage.key());
    
    long timestamp = kafkaKey.getTimeStampInSeconds();
    /*
     * And happily ever after */
    

    这将比使特定分区对应于时间间隔更灵活。否则,您将不得不不断为不同的时间范围添加分区,并为哪个分区对应哪个时间范围保留一个单独的、同步的表格,这很快就会变得笨拙。

    【讨论】:

      【解决方案3】:

      This 看起来可以帮助您实现目标。它允许您轻松定义和编写隐藏(反)序列化负担的消息标题。您唯一需要提供的是您通过线路发送的实际对象的(反)序列化器。这种实现实际上尽可能地延迟了有效负载对象的反序列化过程,这意味着您可以(以一种非常高效和透明的方式)反序列化标头,检查时间戳,并且仅在当/何时反序列化有效负载(重位)您确定该对象对您有用。

      【讨论】:

        【解决方案4】:

        注意,根据以下讨论,Kafka 在消息的内部表示中引入了时间戳: https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message

        还有这些门票: https://issues.apache.org/jira/browse/KAFKA-2511

        它应该在所有版本的 Kafka 0.10.0.0 及更高版本中都可用。

        这里的问题是您以不再需要的顺序摄取消息。如果订单很重要,那么您需要放弃相关生产者中的并行性。然后消费者级别的问题就消失了。

        【讨论】:

          猜你喜欢
          • 2019-04-18
          • 2017-11-12
          • 1970-01-01
          • 1970-01-01
          • 2019-07-28
          • 2021-08-17
          • 1970-01-01
          • 1970-01-01
          • 2013-09-19
          相关资源
          最近更新 更多