【问题标题】:kafka Avro message deserializer for multiple topics用于多个主题的 kafka Avro 消息反序列化器
【发布时间】:2019-11-17 18:35:16
【问题描述】:

我正在尝试以 avro 格式反序列化 kafka 消息,我正在使用以下代码: https://github.com/ivangfr/springboot-kafka-debezium-ksql/blob/master/kafka-research-consumer/src/main/java/com/mycompany/kafkaresearchconsumer/kafka/ReviewsConsumerConfig.java

上面的代码作为单个主题对我来说很好,但我必须监听来自多个主题的消息并创建多个 AvroGenerated 文件,但我坚持配置,因为 confiration 需要多个 avro 类型的对象。请考虑以下问题:

https://github.com/ivangfr/springboot-kafka-debezium-ksql/issues/3

【问题讨论】:

    标签: java spring-boot apache-kafka avro confluent-schema-registry


    【解决方案1】:

    在您的配置中使用以下行:

    props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
    props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName() );
    

    代替:

    props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, mysql.researchdb.institutes.Value.class );
    props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroWithSchemaDeserializer.class );
    

    最终代码为:

    package com.moglix.netsuite.kafka;
    
    import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
    import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ContainerProperties;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @EnableKafka
    @Configuration
    public class ReviewsConsumerConfig
    {
    
    @Value( "${kafka.bootstrap-servers}" )
    private String bootstrapServers;
    
    @Value( "${kafka.schema-registry-url}" )
    private String schemaRegistryUrl;
    
    @Value( "${kafka.reviews.start-offset}" )
    private String orderStartOffset;
    
    @Value( "${kafka.reviews.max-poll-records}" )
    private Integer maxPollRecords;
    
    @Bean
    public <T> ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory()
    {
        ConcurrentKafkaListenerContainerFactory<String, T> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory( consumerFactory1() );
        factory.setBatchListener( true );
        factory.getContainerProperties().setAckMode( ContainerProperties.AckMode.MANUAL );
        factory.getContainerProperties().setSyncCommits( true );
        return factory;
    }
    
    @Bean
    public <T> ConsumerFactory<String, T> consumerFactory1()
    {
        return new DefaultKafkaConsumerFactory<>( consumerConfigs1() );
    }
    
    @Bean
    public Map<String, Object> consumerConfigs1()
    {
        Map<String, Object> props = new HashMap<>();
        props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers );
        props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class );
        props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName() );
        props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, orderStartOffset );
    
        props.put( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl );
        props.put( KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true );
        props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());// This is main line for my problem solution
        //props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, mysql.researchdb.institutes.Value.class );
    
        props.put( ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords );
        props.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
        return props;
    }
    
    }
    

    【讨论】:

      猜你喜欢
      • 2019-07-12
      • 2019-09-18
      • 1970-01-01
      • 2019-10-09
      • 1970-01-01
      • 2020-12-13
      • 2019-08-30
      • 2018-08-11
      • 1970-01-01
      相关资源
      最近更新 更多