【问题标题】:Kafka Streams API: Session Window incompatible typesKafka Streams API:会话窗口不兼容的类型
【发布时间】:2020-09-05 13:34:49
【问题描述】:

我有以下sn-p:

groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

KTable<byte[], byte[]> mergedTable =
        groupedStream
            .reduce((aggregateValue, newValue) -> {
              try {
                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                aggregateMap.forEach(recentMap::putIfAbsent);
                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
              } catch (Exception e) {
                LOG.warn("Couldn't aggregate key grouped stream\n", e);
              }
              return newValue;
            }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
            .suppress(Suppressed.untilWindowCloses(unbounded()));

我收到以下编译异常:

Error:(164, 63) java: incompatible types: org.apache.kafka.streams.kstream.Suppressed<org.apache.kafka.streams.kstream.Windowed> cannot be converted to org.apache.kafka.streams.kstream.Suppressed<? super byte[]>

我知道如果我像这样内联windowedBy

        KTable<Windowed<byte[]>, byte[]> mergedTable =
                groupedStream
                        .windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO))
                        .reduce((aggregateValue, newValue) -> {
                            try {
                                Map<String, String> recentMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                Map<String, String> aggregateMap = MAPPER.readValue(new String(newValue), HashMap.class);
                                aggregateMap.forEach(recentMap::putIfAbsent);
                                newValue = MAPPER.writeValueAsString(recentMap).getBytes();
                            } catch (Exception e) {
                                LOG.warn("Couldn't aggregate key grouped stream\n", e);
                            }
                            return newValue;
                        }, Materialized.with(Serdes.ByteArray(), Serdes.ByteArray()))
                        .suppress(Suppressed.untilWindowCloses(unbounded()));

它有效,但我不知道如何分离和拆分这两个......

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api apache-kafka-streams kafka-producer-api


    【解决方案1】:

    这里有两个问题。

    第一个问题是 KGroupedStream.windowedBy(SessionWindows) 返回一个 SessionWindowedKStream&lt;K, V&gt; 的实例,在您的第一个示例中

    groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));

    您没有在变量中捕获返回的SessionWindowedKStream

    第二个问题是在你的第一个代码示例中

    KTable&lt;byte[], byte[]&gt; mergedTable

    应该是什么时候

    KTable&lt;Windowed&lt;byte[]&gt;, byte[]&gt; mergedTable

    就像你的第二个例子一样。

    如果你把代码改成

    SessionWindowedKStream<byte[], byte[]> sessionWindowedKStream = groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));
    
    KTable<Windowed<byte[]>, byte[]> mergedTable = 
          sessionWindowedKStream
                    .reduce((aggregateValue, newValue) -> {...
    

    那么它应该可以正常编译了。

    HTH 比尔

    【讨论】:

    • 该解决方案有效,而且确实有效。我不确定 serdes 是否错误,因为当我之前尝试过它时,在尝试内联解决方案之前,由于某种原因我遇到了以下问题:stackoverflow.com/questions/61883482/…
    • 我看了你的评论,我不确定,但我会看看。
    • 我还推测通常将其状态存储在“某处的文件”中,并且一旦设置并保留了正确类型的 WindowedStore(或某处的某些 SerDe 未正确序列化),它就不会不再抛出错误。
    猜你喜欢
    • 2020-09-05
    • 1970-01-01
    • 1970-01-01
    • 2018-07-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-10
    • 1970-01-01
    相关资源
    最近更新 更多