【问题标题】:How to create Custom serializer in kafka?如何在 kafka 中创建自定义序列化程序?
【发布时间】:2017-03-02 10:24:06
【问题描述】:

只有少数可用的序列化程序,例如,

org.apache.kafka.common.serialization.StringSerializer

我们如何创建自己的自定义序列化程序?

【问题讨论】:

    标签: java apache-kafka kafka-producer-api


    【解决方案1】:

    这里有一个示例,您可以将自己的序列化器/反序列化器用于 Kafka 消息值。对于 Kafka 消息密钥是一样的。

    我们希望将 MyMessage 的序列化版本作为 Kafka 值发送,然后在消费者端将其再次反序列化为 MyMessage 对象。

    在生产者端序列化 MyMessage。

    您应该创建一个实现 org.apache.kafka.common.serialization.Serializer 的序列化程序类

    serialize() 方法完成这项工作,接收您的对象并将序列化版本作为字节数组返回。

    public class MyValueSerializer implements Serializer<MyMessage>
    {
        private boolean isKey;
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey)
        {
            this.isKey = isKey;
        }
    
        @Override
        public byte[] serialize(String topic, MyMessage message)
        {
            if (message == null) {
                return null;
            }
    
            try {
    
                (serialize your MyMessage object into bytes)
    
                return bytes;
    
            } catch (IOException | RuntimeException e) {
                throw new SerializationException("Error serializing value", e);
            }
        }
    
        @Override
        public void close()
        {
    
        }
    }
    
    final IntegerSerializer keySerializer = new IntegerSerializer();
    final MyValueSerializer myValueSerializer = new MyValueSerializer();
    final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);
    
    int messageNo = 1;
    int kafkaKey = messageNo;
    MyMessage kafkaValue = new MyMessage();
    ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
    producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));
    

    在消费者端反序列化 MyMessage。

    您应该创建一个实现 org.apache.kafka.common.serialization.Deserializer 的反序列化器类

    deserialize() 方法完成工作,接收序列化值作为字节数组并返回您的对象。

    public class MyValueDeserializer implements Deserializer<MyMessage>
    {
        private boolean isKey;
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey)
        {
            this.isKey = isKey;
        }
    
        @Override
        public MyMessage deserialize(String s, byte[] value)
        {
            if (value == null) {
                return null;
            }
    
            try {
    
                (deserialize value into your MyMessage object)
    
                MyMessage message = new MyMessage();
                return message;
    
            } catch (IOException | RuntimeException e) {
                throw new SerializationException("Error deserializing value", e);
            }
        }
    
        @Override
        public void close()
        {
    
        }
    }
    

    然后像这样使用它:

    final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
    final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
    final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);
    
    ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
    for (ConsumerRecord<Integer, MyMessage> record : records) {
    
        int kafkaKey = record.key();
        MyMessage kafkaValue = record.value();
    
        ...
    }
    

    【讨论】:

    • final KafkaConsumer consumer = new KafkaConsumer(props, keyDeserializer, myValueDeserializer);
    • 上面说的不是语法,那kafka怎么知道反序列化器
    • Deserializer 是构造函数的第三个参数:myValueDeserializer。所有这些代码都取自工作代码,只是更改了一些名称。
    • 为什么在configure()中保存“isKey”?你能解释一下 configure() 和 close() 什么时候不应该是空方法吗?
    • @user1879313 对于这段代码,没有理由,但是例如 Confluent Serializers 使用布尔字段在 Schema Registry 客户端上执行不同的逻辑,随后在 close( ) 方法。
    【解决方案2】:

    没有文字,只有代码

    1. 您发送给 Kafka 的一些对象

      import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.NoArgsConstructor;
      import lombok.ToString;
      
      @Data
      @AllArgsConstructor
      @NoArgsConstructor
      @ToString
      public class TestDto {
      
          private String name;
          private String version;
      
      }
      
    2. 创建序列化器,生产者将使用该序列化器

      @Slf4j
      public class KafkaValueSerializer implements Serializer<TestDto> {
      
          private ObjectMapper objectMapper = new ObjectMapper();
      
          @Override
          public void configure(Map<String, ?> configs, boolean isKey) {
          }
      
          @Override
          public byte[] serialize(String topic, TestDto data) {
              try {
                  return objectMapper.writeValueAsBytes(data);
              } catch (JsonProcessingException e) {
                  log.error("Unable to serialize object {}", data, e);
                  return null;
              }
          }
      
          @Override
          public void close() {
          }
      }
      
    3. 当然,不要忘记 Deserializer for Consumer

      @Slf4j
      public class KafkaValueDeserializer implements Deserializer<TestDto> {
      
          private ObjectMapper objectMapper = new ObjectMapper();
      
          @Override
          public void configure(Map<String, ?> configs, boolean isKey) {
          }
      
          @Override
          public TestDto deserialize(String topic, byte[] data) {
              try {
                  return objectMapper.readValue(new String(data, "UTF-8"), TestDto.class);
              } catch (Exception e) {
                  log.error("Unable to deserialize message {}", data, e);
                  return null;
              }
          }
      
          @Override
          public void close() {
          }
      }
      
    4. 最后一刻,将序列化器/反序列化器添加到 application.yml

      spring:
          kafka:
            bootstrap-servers:  192.168.192.168:9092
            producer:
                value-serializer: com.package.service.kafka.KafkaValueSerializer
            consumer:
                group-id: groupId
                value-deserializer: com.package.service.kafka.KafkaValueDeserializer
      

    就是这样。不需要任何配置文件或手鼓跳舞:)

    1. 发送

      KafkaTemplate<String, TestDto> kafkaTemplate;
      
      TestDto test = new TestDto("test name", "test-version");
      kafkaTemplate.send(topic, testDto);
      
    2. @KafkaListener(topics = "${ktp-agent.kafka.request-topic}", groupId = "${spring.kafka.consumer.group-id}")
      public void listen(TestDto message) {
      
          log.info("Received message '{}' from Kafka.", message.toString());
      }
      

    【讨论】:

    • Spring Kafka 已经有一个 JSON 序列化器和反序列化器,所以可能可以从这个答案中删除它们
    • 是否可以通过 kafka 发送 Map 对象?
    • 可以将Map转换成Json,发送json作为例子
    • 有一个问题:如果我们没有传入消息对象结构怎么办?那么我们如何在这种情况下编写反序列化器。 ?
    • 如果您不知道消息的外观,则无法反序列化。
    【解决方案3】:

    您必须创建自己的序列化程序来实现接口Serializer (org.apache.kafka.common.serialization.Serializer),然后将生产者选项key.serializer / value.serializer 设置为它。

    【讨论】:

      猜你喜欢
      • 2019-11-27
      • 1970-01-01
      • 2012-12-27
      • 2021-07-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-01-07
      • 1970-01-01
      相关资源
      最近更新 更多