【发布时间】:2018-07-15 17:12:10
【问题描述】:
我正在尝试聚合流以获取窗口流中 user_id 的计数。 Stream 没有 key ,因此需要从 value 中获取 user_id 并聚合并将该窗口内的活动用户数打印到控制台/api。 代码如下:
final KStream<String, avroschema> feeds = builder.stream("input_topic");
final KTable<String, Long> aggregated = feeds
// map the user id as key
.map((key, value) -> new KeyValue<>(value.getUserId().toString(), value))
.groupByKey()
.count("state_store");
aggregated.print();
我得到的输出是:
[KSTREAM-AGGREGATE-0000000002]: 123 , (1<-null)
[KSTREAM-AGGREGATE-0000000002]: 456 , (1<-null)
[KSTREAM-AGGREGATE-0000000002]: 789 , (1<-null)
我怎样才能只打印输出中的计数,如下所示?
user_count 3
我试图得到如下计数:
KTable<Windowed<String>, Long> countUsers = feeds
// map the user name as key, because the subsequent counting is performed based on the key
.map((key, value) -> new KeyValue<>(value.getUserId().toString(), value))
// count users, using one-minute tumbling windows
.countByKey(TimeWindows.of("UserCountWindow", 60 * 1000L))
但它显示如下错误。这有什么问题?
Cannot resolve method 'countByKey(org.apache.kafka.streams.kstream.TimeWindows)'
【问题讨论】:
-
您是否将用户活动存储在 Kafka 中?为什么不在 Redis 存储中然后将数据拉入 Kafka?
标签: apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams apache-kafka-connect