【问题标题】:How to add headers to Kafka message?如何将标头添加到 Kafka 消息?
【发布时间】:2020-08-18 08:18:33
【问题描述】:

我需要更多标题来满足我的特殊需求,例如日志记录等。我对如何添加它们有点困惑。我相信应该将它们添加到我通过 send() 方法直接通信的地方。如何在 Spring App 中为我的 Kafka 消息设置自定义标头,我的通信方式如下:

final var streamMessage = StreamMessage
            .builder()
            .payload(Event.builder().eventId(eventId).result(result).build())
            .build();

    ListenableFuture<SendResult<String, StreamMessage<?>>> future = kafkaTemplate.send(TOPIC, streamMessage);

我的 Kafka 配置如下所示:

@Configuration
public class KafkaConfiguration {

    private final Map<String, Object> producerProps;
    private final Map<String, Object> consumerProps;

    @Autowired
    public KafkaConfiguration(@Value("${kafka.bootstrap.servers}") String bootstrapServers) {
        this.producerProps = producerProps(bootstrapServers);
        this.consumerProps = consumerProps(bootstrapServers);
    }

    private Map<String, Object> producerProps(String bootstrapServers) {
        final Map<String, Object> props = new ConcurrentHashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        return props;
    }

    private Map<String, Object> consumerProps(String bootstrapServers) {
        final Map<String, Object> props = new ConcurrentHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return props;
    }

    @Bean
    public ConsumerFactory<String, StreamMessage<?>> consumerFactory() {

        JsonDeserializer<StreamMessage<?>> deserializer = new JsonDeserializer<>(StreamMessage.class);

        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        return new DefaultKafkaConsumerFactory<>(consumerProps,
                new StringDeserializer(),
                deserializer);
    }

    @Bean
    public ProducerFactory<String, StreamMessage<?>> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerProps);
    }

    @Bean
    public KafkaTemplate<String, StreamMessage<?>> kafkaProducerTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, StreamMessage<?>> listenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, StreamMessage<?>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

【问题讨论】:

标签: spring apache-kafka


【解决方案1】:

Pim (Adding custom header using Spring Kafka) 的最后回答对我有用:
当我偶然发现这个问题时,我正在寻找答案。但是我使用的是ProducerRecord&lt;?, ?&gt; 类而不是Message&lt;?&gt;,因此标题映射器似乎不相关。

这是我添加自定义标题的方法:

var record = new ProducerRecord<String, String>(topicName, "Hello World");
record.headers().add("foo", "bar".getBytes());
kafkaTemplate.send(record);

现在要读取标头(在使用之前),我添加了一个自定义拦截器。

import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {

    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
        Set<TopicPartition> partitions = records.partitions();
        partitions.forEach(partition -> interceptRecordsFromPartition(records.records(partition)));
        
        return records;
    }

    private void interceptRecordsFromPartition(List<ConsumerRecord<Object, Object>> records) {
        records.forEach(record -> {
            var myHeaders = new ArrayList<Header>();
            record.headers().headers("MyHeader").forEach(myHeaders::add);
            log.info("My Headers: {}", myHeaders);
            // Do with header as you see fit
        });
    }

    @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
    @Override public void close() {}
    @Override public void configure(Map<String, ?> configs) {}
}

最后一点是使用以下(Spring Boot)配置向 Kafka Consumer Container 注册此拦截器:

import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@Configuration
public class MessagingConfiguration {

    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
        Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

}

【讨论】:

    猜你喜欢
    • 2018-08-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-12-23
    • 2020-07-25
    • 2011-08-15
    相关资源
    最近更新 更多