【问题标题】:Spring Boot Kafka newbie question on serializing / deserializing关于序列化/反序列化的 Spring Boot Kafka 新手问题
【发布时间】:2020-01-31 23:39:37
【问题描述】:

我是 Kafka 的新手(使用 Spring Boot 2.2.4),我看到 KafkaTemplate 是字符串的示例,字符串只是发送一个字符串。我正在研究发送 Json 对象,我在那里看到了 2 种不同的方法......有些人正在使用 String、Object,有些人正在使用 String、TheActualModelClass。

两者之间有优缺点吗?我有点假设主要差异是类型化模板仅适用于一个模型,而对象可以将任何类型发送到任何主题?除此之外还有什么?

【问题讨论】:

  • “那个”之外没有什么。如果您发送不同类型的对象,请使用Object(或SomeSuperClass)作为值泛型参数,如果您只发送一种类型,请使用SomeConcreteClass。正如@DeadPoll 在下面所说的那样;你需要一个可以处理多种类型的序列化程序。

标签: java spring spring-boot apache-kafka


【解决方案1】:

虽然我可能会迟到回答它,但它可能对那些正在寻找解决方案的人有用。 详细解决方案可以在https://github.com/CODINGSAINT/kafka-stream-spring查看

想想我们是否有一个自定义的 java bean

public class Quote {
private String content;
private Set<String> tags;
private String author;
}

您需要编写 Kafka Producer 以及 Consumer 配置

 /**
 * Configurations for KafkaStreams
 * @param kafkaProperties Will take defaults from application YAML or Properties file with spring.kafka
 * @return kafkaConfiguration
 */
@Bean(name= KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaConfiguration(final KafkaProperties kafkaProperties){
    Map<String, Object> config = new HashMap<>();
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, QuoteSerde.class.getName() );
    config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
    return new KafkaStreamsConfiguration(config);
}

/**
 * The Stream which delegates each incoming topic to respective destination topic
 * @param kStreamsBuilder
 * @return
 */
@Bean
public KStream<String,Quote> kStream(StreamsBuilder kStreamsBuilder){
    KStream<String,Quote> stream=kStreamsBuilder.stream(inputTopic);
    for(String topic:allTopics){
        stream.filter((s, quote) -> quote.getTags().contains(topic)).to(topic);
    }
    return stream;

}

/**
 * Kafka ConsumerFactory configurations
 * @return
 */
@Bean
public ConsumerFactory<String, Quote> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            kafkaProperties.getBootstrapServers());
    props.put(
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            BytesDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

/**
 * Required Configuration for POJO to JSON
 * @return ConcurrentKafkaListenerContainerFactory
 */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Quote>
kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Quote> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}

然后我们需要一个序列化器

public class QuoteSerializer implements Serializer<Quote> {

    @Override
    public byte[] serialize(String s, Quote quote) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            retVal = objectMapper.writeValueAsString(quote).getBytes();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return retVal;
    }
}

还有一个反序列化器

public class QuoteDeserializer implements Deserializer<Quote> {

    @Override
    public Quote deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        Quote quote = null;
        try {
            quote = mapper.readValue(bytes, Quote.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return quote;
    }
}

同时使用 Serializer 和 Deserializer a Serde

public class QuoteSerde implements Serde<Quote> {
    public QuoteSerde() {
    }

    @Override
    public Serializer<Quote> serializer() {
        return new QuoteSerializer();
    }

    @Override
    public Deserializer<Quote> deserializer() {
        return new QuoteDeserializer();
    }
}

现在我们的听众可以收听了

@Component
public class TopicConsumers {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumers.class);

    @Value("#{'${kafka.topic.output}'.split(',')}")
    private List<String> allTopics;

    /**
     * For simplicity we are listening all topics at one listener
     */

    @KafkaListener(id = "allTopics", topics = "#{'${kafka.topic.output}'.split(',')}",
            containerFactory = "kafkaListenerContainerFactory")
    public void consume(@Payload Quote quote,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String incomingTopic,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
    ) {
        LOGGER.info("Incoming quote {}-> {}", incomingTopic, quote);
    }
}

下面是application.yml文件

spring:
  kafka:
    listener:
      missing-topics-fatal: false
    client-id : quotes-app
    bootstrap-server:
      - localhost:9091
      - localhost:9001
      - localhost:9092
    template:
      default-topic: quotes
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.codingsaint.learning.kafkastreamspring.QuoteSerializer
    consumer:
      properties:
        partition:
          assignment:
            strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
      group-id: random-consumer
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.codingsaint.learning.kafkastreamspring.QuoteDeserializer
---
kafka:
  topic:
    input: quotes
    output: business,education,faith,famous-quotes,friendship,future,happiness,inspirational,life,love,nature,politics,proverb,religion,science,success,technology

【讨论】:

    【解决方案2】:

    在使用 spring-kafka 库时,我建议使用 spring JsonSerializerJsonDeserializer 以避免大量样板代码,您可以找到有关 Spring Serialization, Deserialization, and Message Conversion 的更多信息

    Apache kafka 只提供了用于序列化和反序列化的高级 API,因此用户需要自定义实现来进行序列化或反序列化

    org.apache.kafka.common.serialization.Serializer<T>
    org.apache.kafka.common.serialization.Deserializer<T>
    

    Apache Kafka 提供了一个高级 API,用于对记录值及其键进行序列化和反序列化。它与带有一些内置实现的 org.apache.kafka.common.serialization.Serializer 和 org.apache.kafka.common.serialization.Deserializer 抽象一起出现。同时,我们可以通过 Producer 或 Consumer 配置属性来指定序列化器和反序列化器类。

    但是Spring-kafka提供JsonSerializerJsonDeserializer基于ObjectMapper

    Spring for Apache Kafka 还提供了基于 Jackson JSON 对象映射器的 JsonSerializer 和 JsonDeserializer 实现。 JsonSerializer 允许将任何 Java 对象编写为 JSON 字节[]。 JsonDeserializer 需要一个额外的 Class targetType 参数以允许将消耗的 byte[] 反序列化为正确的目标对象。

    它还提供了使用Type Mappings@KafkaListener on a Class将不同类型的JSON对象反序列化到各个java POJO类

    【讨论】:

    • Apache 还包括一个使用 Jackson 的 JSONSerializer
    【解决方案3】:

    JSON 只是一个字符串。在代码序列结束时,您的模型对象无论如何都会被序列化为字节

    你更喜欢哪一个取决于你想抽象出多少 Kafka 序列化程序代码

    【讨论】:

      猜你喜欢
      • 2018-02-05
      • 2018-08-11
      • 1970-01-01
      • 1970-01-01
      • 2018-09-25
      • 1970-01-01
      • 2017-10-30
      • 1970-01-01
      • 2021-07-01
      相关资源
      最近更新 更多