【问题标题】:How to use custom serializers with KTable in Kafka streams?如何在 Kafka 流中使用带有 KTable 的自定义序列化程序?
【发布时间】:2021-04-01 08:43:07
【问题描述】:

当我将groupBy 介绍给我的KStream 时,我在将KStream 序列化为KTable 时遇到了一些错误。据我了解,一旦KStream 上有aggregatereduce,由于必要的洗牌,卡夫卡会尝试将其转换为KTable,因此卡夫卡必须再次序列化记录。所以,我原来的KStream 只是像这样将记录从JSON 映射到AVRO,它工作正常。

    @StreamListener("notification-input-channel")
    @SendTo("notification-output-avro-channel")
    public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
        log.info("received PosInvoice JSON: {}", input);
        KStream<String, NotificationAvro> notificationAvroKStream = input
                .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                .mapValues(v -> recordBuilder.getNotificationAvro(v));
        notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
        return notificationAvroKStream;
    }

然后我介绍了groupByKeyreduce,我意识到它转换为KTable,因此它需要Serdes 文件上的Serdes。但不幸的是我无法配置默认的Serdes,因为我还有其他类型的序列化。因此我决定对KTable 拓扑进行序列化。我正在尝试实施此解决方案based on this answer

我尝试使用自定义 serdes 实现的部分代码无法正常工作 (Materialized.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()))。首先,我认为我不需要KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");,但没有它它也不起作用,我找不到不是KeyValueBytes... 的物化,我可以在其中定义我的serdes CustomSerdes.String(), CustomSerdes.NotificationAvro()

根据我在链接中提到的答案,他们也使用final StreamsBuilder builder = new StreamsBuilder();。但由于我使用spring-kafka 计算它,我没有这个选项,或者如果我有我不知道如何使用。

@Service
@Slf4j
@EnableBinding(PosListenerJsonAvroBinding.class)
public class NotificationJsonAvroProcessorService {
    @Autowired
    RecordBuilder recordBuilder;

    @StreamListener("notification-input-channel")
    @SendTo("notification-output-avro-channel")
    public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
        log.info("received PosInvoice JSON: {}", input);
        KStream<String, NotificationAvro> notificationAvroKStream = input
                .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)));
        notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));

        // *********************************************
        // IS THERE A KeyValueStoreSupplier THAT I CAN PASS ALSO MY SERDES INSTEAD OF Bytes?
        // KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
        KTable<String, NotificationAvro> convertedTable = notificationAvroKStream
                .toTable(
                        // *********************************************
                        // HOW TO MATERIALIZE KTABLE VALUES WITH SERDES ?
                        Materialized
                                // .as(storeSupplier) // this is not necessary
                                .with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
                        // *********************************************
                )
                .groupBy((cardNo, notificationAvro) -> KeyValue.pair(cardNo, notificationAvro))
                .reduce(
                        (aggValue, newValue) -> {
                            newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
                            return newValue;
                        },
                        (aggValue, oldValue) -> oldValue
                );
        KStream<String, NotificationAvro> notificationAggAvroKStream = convertedTable.toStream();
        notificationAggAvroKStream.foreach((k, v) -> log.info(String.format("Notification agg avro - key: %s, value: %s", k, v)));

        return notificationAggAvroKStream;
    }
}

自定义 serdes:

@Service
public class CustomSerdes extends Serdes {
    private static final String schema_registry_url = "http://localhost:8081";
    private final static Map<String, String> serdeConfig = Collections
            .singletonMap("schema.registry.url", schema_registry_url);
    public static Serde<NotificationAvro> NotificationAvro() {
        final Serde<NotificationAvro> notificationAvroSerde = new SpecificAvroSerde<>();
        notificationAvroSerde.configure(serdeConfig, false);
        return notificationAvroSerde;
    }
}

和错误:

线程异常 “NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1” org.apache.kafka.streams.errors.StreamsException:ClassCastException 同时为主题生成数据 NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010-repartition。 序列化程序(键: org.apache.kafka.common.serialization.StringSerializer / 值: org.apache.kafka.streams.kstream.internals.ChangedSerializer) 不是 兼容实际的键或值类型(键类型:java.lang.String /值类型:org.apache.kafka.streams.kstream.internals.Change)。 更改 StreamConfig 中的默认 Serdes 或提供正确的 Serdes 通过方法参数(例如,如果使用 DSL,#to(String topic, Produced&lt;K, V&gt; produced) Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)))。 ... ... 引起:java.lang.ClassCastException:类 com.github.felipegutierrez.explore.spring.model.NotificationAvro 不能转换为类 java.lang.String (com.github.felipegutierrez.explore.spring.model.NotificationAvro 是 在加载程序“app”的未命名模块中; java.lang.String 在模块中 加载器'bootstrap'的java.base)

【问题讨论】:

    标签: apache-kafka apache-kafka-streams spring-kafka


    【解决方案1】:

    所以,我通过阅读this answer 解决了这个问题,它使用了已弃用的.groupByKey(...)Serialized.with(...)。我正在使用Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())

    KStream<String, NotificationAvro> notificationAvroKStream = input
         .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
         .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)))
         .groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()))
         .reduce((aggValue, newValue) -> {
              newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
              return newValue;
         })
         .toStream();
    notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro agg - key: %s, value: %s", k, v)));
    return notificationAvroKStream;
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-03-02
      • 2020-09-19
      • 2019-04-30
      • 1970-01-01
      • 1970-01-01
      • 2019-11-27
      • 2011-11-01
      • 1970-01-01
      相关资源
      最近更新 更多