【发布时间】:2021-03-26 13:24:13
【问题描述】:
我似乎无法将主题的序列化程序覆盖为Serdes.String()。我正在尝试一个从主题(流)读取并写入 KTable 的简单用例。到目前为止我所拥有的:
@Component
class Processor {
@Autowired
public void process(final StreamsBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
builder.stream("input_topic", Consumed.with(stringSerde, stringSerde))
.filter((key, value) -> value.contains("ACTION"))
.toTable(Materialized.as("output_table_materialized"))
.toStream().to("output_table", Produced.with(stringSerde, stringSerde)); // EDIT: added this last line
}
}
我得到的例外是:
org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
据我所知,它知道消息是String,但它使用的是默认解串器ByteArraySerializer。我在上面的代码中哪里出错了?
【问题讨论】:
标签: apache-kafka apache-kafka-streams spring-kafka ktable