【问题标题】:Java Spring Kafka Template producer lost messages on broker restartJava Spring Kafka 模板生产者在代理重启时丢失了消息
【发布时间】:2021-03-17 05:02:26
【问题描述】:

我正在使用 spring-boot (2.1.6.RELEASE) 和 spring-kafka (2.2.7.RELEASE),并且我正在使用 KafkaTemplate 向我的 kafka 集群发送消息。但有时(通常是当我重新启动 kafka-broker 或进行重新平衡时)我在发送消息时会看到这样的错误:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

由于默认的 Kafka 生产者配置,我希望发送失败会被重试,但事实并非如此。默认 Kafka 生产者配置:

retries: 2147483647  (https://kafka.apache.org/documentation/#retries)
acks: 1               (https://kafka.apache.org/documentation/#acks)

我的配置是这样的:

@Bean
    public Map<String, Object> producerConfigs()
    {
        // See https://kafka.apache.org/documentation/#producerconfigs for more properties
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        return props;
    }

    @Bean
    public ProducerFactory<Long, String> producerFactory()
    {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<Long, String> kafkaTemplate(KafkaTemplateProducerListener<Long, String> kafkaTemplateProducerListener,
                                                     ProducerFactory<Long, String> producerFactory)
    {
        KafkaTemplate<Long, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setProducerListener(kafkaTemplateProducerListener);
        return kafkaTemplate;
    }

我正在发送这样的消息:

kafkaTemplate.send(topicName, key, body);

我在整个互联网上进行了搜索,每个人都说这种带有重试和确认的配置必须有效,但它没有。我错过了什么?

谢谢

【问题讨论】:

    标签: java spring-boot apache-kafka spring-kafka kafka-producer-api


    【解决方案1】:

    经过一段时间的调试,我找到了解决方案:

    props.put(ProducerConfig.ACKS_CONFIG, "all");
    

    有关此属性的更多信息:https://kafka.apache.org/documentation/#acks


    非常好的博客,展示了您可能在 kafka 中丢失消息的不同场景:



    旁注 - 来自this 答案我发现如果您不想在关机时丢失消息,那么使用它是个好主意:

    @PreDestroy
    public void flush()
    {
        kafkaTemplate.flush();
    }
    

    【讨论】:

      猜你喜欢
      • 2019-08-10
      • 2018-09-15
      • 2017-02-28
      • 2019-05-14
      • 1970-01-01
      • 2017-01-04
      • 1970-01-01
      • 1970-01-01
      • 2020-11-02
      相关资源
      最近更新 更多