【问题标题】:Kafka Streams Materialized Store Build ErrorKafka Streams物化商店构建错误
【发布时间】:2019-01-06 14:30:06
【问题描述】:

我正在尝试在此处构建 Materialized.as DSL 代码:https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/Stores.html

但我得到了错误

incompatible types: org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted to org.apache.kafka.common.serialization.Serde<java.lang.Object>

上线

.withKeySerde(Serdes.Long())

有人知道这里可能出了什么问题吗?

final StreamsBuilder builder = new StreamsBuilder();

   KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
   KTable<Long,String> dataStore = builder.table(
     "example_stream",
     Materialized.as(storeSupplier)
             .withKeySerde(Serdes.Long())
             .withValueSerde(Serdes.String()));

【问题讨论】:

  • 请添加拓扑/状态存储建设代码
  • 请添加代码。
  • 抱歉,我添加了构建商店的代码。
  • minimal reproducible example。添加builder 定义,以及builder 使用的所有其他代码
  • Stream Builder 默认 Key 和 Value Serde 配置是什么?

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

问题是builder.table 不知道默认为&lt;Object,Object&gt; 的泛型类型。后来,Serde 类型不匹配。您需要指定类型,如

KTable<Long,String> dataStore = builder.<Long,String>table(
    "example_stream",
    Materialized.as(storeSupplier)
        .withKeySerde(Serdes.Long())
        .withValueSerde(Serdes.String()));

【讨论】:

    【解决方案2】:

    如果没有代码示例,我不能肯定地说,但是错误消息非常清楚。您正在向 Kafka 指定密钥的类型为 Long。但是,您的密钥实际上是其他一些 Java 对象。例如,如果您有一条带有字符串键的消息,则此代码将更改为:.withKeySerde(Serdes.String())。检查密钥的类型并为该类型指定正确的Serde

    【讨论】:

    • 感谢您的回复。我添加了相关代码。我很难找到它的错误。
    • 如果您包含来自example_stream 的示例键:值消息也会很有帮助。
    猜你喜欢
    • 1970-01-01
    • 2016-02-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多