【发布时间】:2017-08-13 16:56:39
【问题描述】:
我有一个 Kafka 主题,我在其中发送位置事件(key=user_id,value=user_location)。我可以将其作为KStream 阅读和处理:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Location> locations = builder
.stream("location_topic")
.map((k, v) -> {
// some processing here, omitted form clarity
Location location = new Location(lat, lon);
return new KeyValue<>(k, location);
});
这很好用,但我希望有一个KTable,其中包含每个用户的最后一个已知位置。我该怎么办?
我能够读写一个中间主题:
// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");
// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");
有没有从KStream 获取KTable 的简单方法?这是我第一个使用 Kafka Streams 的应用程序,所以我可能遗漏了一些明显的东西。
【问题讨论】: