【问题标题】:Print active users count from the windowed kafka stream to the console将窗口化 kafka 流中的活动用户数打印到控制台
【发布时间】:2018-07-15 17:12:10
【问题描述】:

我正在尝试聚合流以获取窗口流中 user_id 的计数。 Stream 没有 key ,因此需要从 value 中获取 user_id 并聚合并将该窗口内的活动用户数打印到控制台/api。 代码如下:

        final KStream<String, avroschema> feeds = builder.stream("input_topic");
final KTable<String, Long> aggregated = feeds
            // map the user id as key
            .map((key, value) -> new KeyValue<>(value.getUserId().toString(), value))
            .groupByKey()
            .count("state_store");
aggregated.print();

我得到的输出是:

[KSTREAM-AGGREGATE-0000000002]: 123 , (1<-null)
[KSTREAM-AGGREGATE-0000000002]: 456 , (1<-null)
[KSTREAM-AGGREGATE-0000000002]: 789 , (1<-null)

我怎样才能只打印输出中的计数,如下所示?

 user_count 3

我试图得到如下计数:

   KTable<Windowed<String>, Long> countUsers = feeds
            // map the user name as key, because the subsequent counting is performed based on the key
            .map((key, value) -> new KeyValue<>(value.getUserId().toString(), value))
            // count users, using one-minute tumbling windows
            .countByKey(TimeWindows.of("UserCountWindow", 60 * 1000L))

但它显示如下错误。这有什么问题?

Cannot resolve method 'countByKey(org.apache.kafka.streams.kstream.TimeWindows)'

【问题讨论】:

  • 您是否将用户活动存储在 Kafka 中?为什么不在 Redis 存储中然后将数据拉入 Kafka?

标签: apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams apache-kafka-connect


【解决方案1】:

如果您将用户 ID 设置为键,则您计算每个用户出现的频率。这个计数显然是1

如果要统计所有用户,则需要设置一个对所有要统计的记录都相同的“虚拟键”。

对于编译错误:这只是错误的代码。阅读文档:https://kafka.apache.org/10/documentation/streams/developer-guide/dsl-api.html#id12

不确定您使用的是什么版本,但 .countByKey(TimeWindows.of("UserCountWindow", 60 * 1000L)) 是旧 API,在 0.11 版本中已更改。

【讨论】:

    猜你喜欢
    • 2017-01-12
    • 1970-01-01
    • 1970-01-01
    • 2016-09-25
    • 1970-01-01
    • 1970-01-01
    • 2019-12-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多