【问题标题】:Is it possible to get the latest value for a message key from kafka messages是否可以从 kafka 消息中获取消息键的最新值
【发布时间】:2020-04-01 06:37:42
【问题描述】:

假设我对同一个消息键有不同的值。

例如:

{
userid: 1,
email: user123@xyz.com }

{
userid: 1,
email: user456@xyz.com }

{
userid: 1,
email: user789@xyz.com }

在上述情况下,我只想要用户更新的最新值,即“user789@xyz.com”。

我的 kafka 流应该只给我第三个值,而不是前两个值。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams spring-kafka confluent-platform ksqldb


    【解决方案1】:

    由于您没有指定特定的客户端,我将向您展示如何使用 ksqlDB 和新添加的函数 LATEST_BY_OFFSET 来完成此操作。

    首先,我用源数据填充主题:

    kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
    1:{ "userid": 1, "email": "user123@xyz.com" }
    1:{ "userid": 1, "email": "user456@xyz.com" }
    1:{ "userid": 1, "email": "user789@xyz.com" }
    EOF
    

    然后在 ksqlDB 中首先将其建模为事件流:

    ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');
    
     Message
    ----------------
     Stream created
    ----------------
    
    ksql> SET 'auto.offset.reset' = 'earliest';                                                                                                                                                                                                                                         [35/60]
    Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
    ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
    +---------+---------+-----------------+
    |ROWKEY   |USERID   |EMAIL            |
    +---------+---------+-----------------+
    |1        |1        |user123@xyz.com  |
    |1        |1        |user456@xyz.com  |
    |1        |1        |user789@xyz.com  |
    

    现在我们可以直接告诉 ksqlDB 获取这个事件流并只给我们最新的值(基于偏移量):

    ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
    +--------------------+--------------------+
    |USERID              |KSQL_COL_1          |
    +--------------------+--------------------+
    |1                   |user789@xyz.com     |
    
    Press CTRL-C to interrupt
    

    或者更有用的是,作为 ksqlDB 中的物化状态:

    CREATE TABLE USER_LATEST_STATE AS 
        SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL 
          FROM USER_UPDATES 
         GROUP BY USERID 
         EMIT CHANGES;
    

    此表仍由 Kafka 主题的更改驱动,但可以直接查询当前的状态,无论是现在(“拉查询”):

    ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
    +--------------------+
    |EMAIL               |
    +--------------------+
    |user789@xyz.com     |
    Query terminated
    ksql>
    

    或作为状态演变的变化流(“推送查询”):

    ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
    +--------------------+
    |EMAIL               |
    +--------------------+
    |user789@xyz.com     |
    
    [ query continues indefinitely ]
    

    【讨论】:

    • 创建表时,我看到多条记录具有相同的键。这是故意的吗?
    【解决方案2】:

    您似乎想在进一步处理之前缓冲记录。由于在流式传输中,您拥有不断增长的无限数据集,因此您永远不知道是要等待更多记录还是刷新缓冲区以进行进一步处理。您能否添加更多关于您将如何处理这些记录的详细信息?

    您可以引入一个附加参数,即在刷新缓冲区之前要等待的最长时间。要归档它,您可以use a Session window or a Tumbling window,或使用records cache in associate with a commit interval,或者您也可以使用 Kafka 低级处理器 API 实现它。

    这里是示例代码,展示了如何使用 Tumbling 窗口将其存档,以在 1 小时时间窗口内聚合和抑制所有 userId 信息,接受 10 分钟后迟到的事件,然后将抑制的事件发送到下游处理器(如果你使用这个may not get the final results,直到有新活动到来):

    userInfoKStream
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(Duration.ofMinutes(10)))
        .aggregate(() -> "", (userId, newValue, currentValue) -> newValue)
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .foreach((userId, value) -> {});
    

    【讨论】:

      【解决方案3】:

      您需要Kafka log compaction。简而言之,如果您希望您的主题只保留特定键的最后一个值,您应该设置属性log.cleanup.policy=compact。你可以在here找到更多相关信息。

      【讨论】:

      • 日志压缩是一种清理机制,用于删除具有相同密钥的旧消息,但它不能帮助您获取特定密钥的“当前最新消息”。
      猜你喜欢
      • 1970-01-01
      • 2020-03-02
      • 1970-01-01
      • 2015-08-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-05-12
      • 2019-02-10
      相关资源
      最近更新 更多