【问题标题】:How to materialize a *windowed* KTable into a Kafka topic如何将 *windowed* KTable 实体化为 Kafka 主题
【发布时间】:2018-05-04 13:11:50
【问题描述】:

我正在编写一个 KafkaStreams 应用程序,该应用程序从一个主题中获取字符串值,我想输出过去 5 分钟内某个键的值的串联,每分钟更新到另一个(压缩的)Kafka 主题。我有一种感觉,我快到了,但我还没有成功。 我用一个简单的方法进行了测试:

grouped_transactions.toStream().foreach((key, value) -> {
    System.out.println(key.window().toString()+ key.key() + "    "+ value);
});

这给了我类似于您在下面看到的内容(我已按源主题键 00909 过滤以简化调试)我不想要的是 所有具有相同连接值的不同 Windows,我只想要我的扩展字符串连接。

Window{start=1525437120000, end=1525437420000}00909    "ABC",-554.53
Window{start=1525437360000, end=1525437660000}00909    "ABC",-554.53
Window{start=1525437240000, end=1525437540000}00909    "ABC",-554.53
Window{start=1525437300000, end=1525437600000}00909    "ABC",-554.53
Window{start=1525437180000, end=1525437480000}00909    "ABC",-554.53
Window{start=1525437120000, end=1525437420000}00909    "ABC",-554.53;"ABC",646.03
Window{start=1525437180000, end=1525437480000}00909    "ABC",-554.53;"ABC",646.03
Window{start=1525437240000, end=1525437540000}00909    "ABC",-554.53;"ABC",646.03
Window{start=1525437300000, end=1525437600000}00909    "ABC",-554.53;"ABC",646.03
Window{start=1525437360000, end=1525437660000}00909    "ABC",-554.53;"ABC",646.03

以下是所有代码。任何人都知道如何做到这一点? 提前致谢!

Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

StreamsBuilder builder = new StreamsBuilder();

long windowSizeMs = TimeUnit.MINUTES.toMillis(5); // 5 * 60 * 1000L
long advanceMs =    TimeUnit.MINUTES.toMillis(1); // 1 * 60 * 1000L
TimeWindows window = TimeWindows.of(windowSizeMs).advanceBy(advanceMs);
KTable<Windowed<String>, String> grouped_transactions = source
        .filter((k,v)->k.equals("00909"))
        .groupByKey()
        .windowedBy(window)
        .reduce((v1, v2) -> v1 + ";" + v2, Materialized.as("grouped_transactions_5_1"));

// THIS FAILS on runtime with
// java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed  
// cannot be cast to java.lang.String
grouped_transactions.toStream().to(GROUPEDTRANSACTIONS);


final KafkaStreams streams = new KafkaStreams(builder.build(), props);

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    我不想要的是所有不同的 Windows 具有相同的连接值,我只想要我的扩展字符串连接。

    因为您指定重叠窗口,单个记录可以包含在多个窗口实例中。也许,您想指定不重叠的窗口,即带有size == advance 的窗口。

    【讨论】: