【问题标题】:Overriding KStreams default serializer (ByteArraySerializer)覆盖 KStreams 默认序列化程序 (ByteArraySerializer)
【发布时间】:2021-03-26 13:24:13
【问题描述】:

我似乎无法将主题的序列化程序覆盖为Serdes.String()。我正在尝试一个从主题(流)读取并写入 KTable 的简单用例。到目前为止我所拥有的:

@Component
class Processor {
    @Autowired
    public void process(final StreamsBuilder builder) {
        final Serde<String> stringSerde = Serdes.String();
        builder.stream("input_topic", Consumed.with(stringSerde, stringSerde))
                .filter((key, value) -> value.contains("ACTION"))
                .toTable(Materialized.as("output_table_materialized"))
                .toStream().to("output_table", Produced.with(stringSerde, stringSerde)); // EDIT: added this last line

    }
}

我得到的例外是:

org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

据我所知,它知道消息是String,但它使用的是默认解串器ByteArraySerializer。我在上面的代码中哪里出错了?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams spring-kafka ktable


    【解决方案1】:

    Consumed.with 将是一个De序列化器。

    错误出现在 Serializer 或 toTable 调用上,您可以添加 Produced.with 或修改您的应用程序属性以在那里配置默认值

    【讨论】:

    • 谢谢!做了一些测试,得出了同样的结论。所以我然后尝试将 KTable 重新转换回流并在那里写入主题;但是我得到了同样的例外。如果您不介意再看一遍,我已经编辑了问题!
    • KTable 仍然需要类型
    • 不确定我是否理解... toTable() 似乎没有 Serde 参数。我什至尝试先创建KTable&lt;String, String&gt; 变量然后转换它(尽管更多是出于绝望)。我也没有在他们的文档中看到任何提及这一点,一直在关注:kafka-tutorials.confluent.io/kafka-streams-convert-to-ktable/…
    • 物化对象在那个链接中也有一个 serde 参数kafka.apache.org/25/javadoc/org/apache/kafka/streams/kstream/…,他们确实设置了StreamsConfig.DEFAULT_*_SERDE_CLASS_CONFIG...你没有那个吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-10-24
    • 1970-01-01
    • 2018-11-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多