【发布时间】:2021-04-01 08:43:07
【问题描述】:
当我将groupBy 介绍给我的KStream 时,我在将KStream 序列化为KTable 时遇到了一些错误。据我了解,一旦KStream 上有aggregate 或reduce,由于必要的洗牌,卡夫卡会尝试将其转换为KTable,因此卡夫卡必须再次序列化记录。所以,我原来的KStream 只是像这样将记录从JSON 映射到AVRO,它工作正常。
@StreamListener("notification-input-channel")
@SendTo("notification-output-avro-channel")
public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
log.info("received PosInvoice JSON: {}", input);
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.mapValues(v -> recordBuilder.getNotificationAvro(v));
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
return notificationAvroKStream;
}
然后我介绍了groupByKey 和reduce,我意识到它转换为KTable,因此它需要Serdes 文件上的Serdes。但不幸的是我无法配置默认的Serdes,因为我还有其他类型的序列化。因此我决定对KTable 拓扑进行序列化。我正在尝试实施此解决方案based on this answer。
我尝试使用自定义 serdes 实现的部分代码无法正常工作 (Materialized.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()))。首先,我认为我不需要KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");,但没有它它也不起作用,我找不到不是KeyValueBytes... 的物化,我可以在其中定义我的serdes CustomSerdes.String(), CustomSerdes.NotificationAvro()。
根据我在链接中提到的答案,他们也使用final StreamsBuilder builder = new StreamsBuilder();。但由于我使用spring-kafka 计算它,我没有这个选项,或者如果我有我不知道如何使用。
@Service
@Slf4j
@EnableBinding(PosListenerJsonAvroBinding.class)
public class NotificationJsonAvroProcessorService {
@Autowired
RecordBuilder recordBuilder;
@StreamListener("notification-input-channel")
@SendTo("notification-output-avro-channel")
public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
log.info("received PosInvoice JSON: {}", input);
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)));
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
// *********************************************
// IS THERE A KeyValueStoreSupplier THAT I CAN PASS ALSO MY SERDES INSTEAD OF Bytes?
// KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
KTable<String, NotificationAvro> convertedTable = notificationAvroKStream
.toTable(
// *********************************************
// HOW TO MATERIALIZE KTABLE VALUES WITH SERDES ?
Materialized
// .as(storeSupplier) // this is not necessary
.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
// *********************************************
)
.groupBy((cardNo, notificationAvro) -> KeyValue.pair(cardNo, notificationAvro))
.reduce(
(aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
},
(aggValue, oldValue) -> oldValue
);
KStream<String, NotificationAvro> notificationAggAvroKStream = convertedTable.toStream();
notificationAggAvroKStream.foreach((k, v) -> log.info(String.format("Notification agg avro - key: %s, value: %s", k, v)));
return notificationAggAvroKStream;
}
}
自定义 serdes:
@Service
public class CustomSerdes extends Serdes {
private static final String schema_registry_url = "http://localhost:8081";
private final static Map<String, String> serdeConfig = Collections
.singletonMap("schema.registry.url", schema_registry_url);
public static Serde<NotificationAvro> NotificationAvro() {
final Serde<NotificationAvro> notificationAvroSerde = new SpecificAvroSerde<>();
notificationAvroSerde.configure(serdeConfig, false);
return notificationAvroSerde;
}
}
和错误:
线程异常 “NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1” org.apache.kafka.streams.errors.StreamsException:ClassCastException 同时为主题生成数据 NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010-repartition。 序列化程序(键: org.apache.kafka.common.serialization.StringSerializer / 值: org.apache.kafka.streams.kstream.internals.ChangedSerializer) 不是 兼容实际的键或值类型(键类型:java.lang.String /值类型:org.apache.kafka.streams.kstream.internals.Change)。 更改 StreamConfig 中的默认 Serdes 或提供正确的 Serdes 通过方法参数(例如,如果使用 DSL,
#to(String topic, Produced<K, V> produced)Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)))。 ... ... 引起:java.lang.ClassCastException:类 com.github.felipegutierrez.explore.spring.model.NotificationAvro 不能转换为类 java.lang.String (com.github.felipegutierrez.explore.spring.model.NotificationAvro 是 在加载程序“app”的未命名模块中; java.lang.String 在模块中 加载器'bootstrap'的java.base)
【问题讨论】:
标签: apache-kafka apache-kafka-streams spring-kafka