【问题标题】:Joining two Kafka streams加入两个 Kafka 流
【发布时间】:2018-11-03 19:11:38
【问题描述】:

尝试加入两个 Kstream,但出现类型不匹配错误,下面是代码 sn-p。

  KStream<String, String> longCounts = netExpence.join(netIncome, (key1, key2) -> key1 + "/" + key2,                    JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),stringSerde, stringSerde, stringSerde);

出现的错误是Type mismatch: cannot convert from String to R 这是加入 kstream 的语法join(KStream&lt;K,VO&gt; otherStream, ValueJoiner&lt;? super V,? super VO,? extends VR&gt; joiner, JoinWindows windows, Joined&lt;K,V,VO&gt; joined)

请解释一下ValueJoiner&lt;? super V,? super VO,? extends VR&gt; 的作用。谢谢

【问题讨论】:

    标签: apache-kafka kafka-consumer-api kafka-producer-api apache-kafka-streams apache-kafka-connect


    【解决方案1】:

    使用匹配记录的两个值调用ValueJoiner,并发出一个连接结果值。

    // key type must be the same for a join
    // value type can be different
    KStream<KeyType, ValueType1> stream1 = ...
    KStream<KeyType, ValueType2> stream2 = ...
    
    KStream<KeyType, OutputType> joined = stream1.join(stream2, ...);
    

    因此,ValueJoiner 必须将 ValueType1 作为第一个泛型 (? super V),并将 ValueType2 作为第二个泛型 (? super VO)。对于第三个泛型 (? extend VR),您指定输出类型(即上面示例中的 OutputType)。

    更新

    您还需要为运行时配置正确的 Serdes。如果所有类型都相同,最好通过 StreamsConfig 为键和/或值相应地设置默认 serdes。否则,您可以覆盖每个运算符的默认 Serdes:

    【讨论】:

    • 谢谢,但是所有三个 V,VO wnd VR 的数据类型都是字符串..我也提到过相同的但出现类型转换错误..
    • 不确定。您的问题没有显示 netIncome - 我猜是 KStream&lt;String,String&gt; ?你能分享完整的编译器错误吗?我不清楚为什么它说String to R - 不知道R 是什么...
    • 这是 netIncome 和 netExpence KStream netExpence 和 KStream netIncome 的键值数据类型
    • 请检查您的默认 serdes 键和值是什么?您可以尝试将其设置为 Serdes.String。否则,在执行连接时提供 serdes。
    猜你喜欢
    • 1970-01-01
    • 2018-12-10
    • 1970-01-01
    • 2018-11-07
    • 1970-01-01
    • 2018-03-17
    • 2020-11-04
    • 2020-03-28
    • 2019-10-11
    相关资源
    最近更新 更多