【问题标题】:Spring Kafka Transaction - Duplicate messages published to topicSpring Kafka Transaction - 发布到主题的重复消息
【发布时间】:2018-06-20 04:03:50
【问题描述】:

我们正在尝试为我们的生产者实现交易。我们的用例我们从 MQ 接收消息并发布到 kafka。当出现故障时,我们需要回滚发布到 kafka 的消息,并且不向 MQ 发送确认。

当我们使用事务时,我们看到消息在 kafka 主题中重复。

@Bean("producerConfig")
public Properties producerConfig() {
    LOGGER.info("Creating Dev Producer Configs");
    Properties configs = new Properties();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);
    configs.put(ProducerConfig.ACKS_CONFIG, "all");
    configs.put(ProducerConfig.RETRIES_CONFIG, 1);
    configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    return configs;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(new HashMap<String, Object>((Map) producerConfig));
    producerFactory.setTransactionIdPrefix("spring-kafka-transaction");
    return producerFactory;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);
    return kafkaTemplate;
}


@Bean
KafkaTransactionManager<String,String> kafkaTransactionManager(){
    KafkaTransactionManager<String, String> transactionManager = new KafkaTransactionManager<>(producerFactory());
    return transactionManager;
}

监听方法

@Component
public class WMQListener implements MessageListener {

KafkaTemplate<String, String> kafkaTemplate;

@Override
@Transactional
public void onMessage(Message message) {
    String onHandXmlStr = null;
    try {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            onHandXmlStr = textMessage.getText();
        }
        LOGGER.debug("Message Received from WMQ :: " + onHandXmlStr);
        Msg msg = JaxbUtil.convertStringToMsg(onHandXmlStr);
        List<String> onHandList = DCMUtil.convertMsgToList(msg);

        ListenableFuture send = kafkaTemplate.sendDefault(onHandList.get(0));
        send.addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                ex.printStackTrace();
            }

            @Override
            public void onSuccess(Object result) {
                System.out.println(result);
            }
        });
     message.acknowledge();

}

【问题讨论】:

  • 您的@Configuration 课程中有@EnableTransactionManagement 吗?您是否将您的消费者 ISOLATION_LEVEL_CONFIG 更改为 READ_COMMITTED ?如果两者都是,DEBUG 日志(org.apache.kafkaorg.springframework.transaction)能否帮助您了解发生了什么?
  • 是的,我有 @EnableTransactionManagement,但我目前正在测试生产者应用程序并且尚未启动消费者,因为我观察到单个消息的偏移量增加了 2,所以我很担心。今天,我通过在同一主题上启动控制台使用者进行了测试,但没有明确指定 READ COMMITTED,但我只收到消息而不是重复消息。但是,我想知道为什么当我使用 @TransactionKafkaTransactionManager 而没有它时偏移量增加了两倍。

标签: apache-kafka spring-transactions spring-kafka


【解决方案1】:

但是,我想知道为什么偏移量增加了 2

由于 kafka 主题是线性日志(每个分区),回滚消息仍会占用日志中的一个槽(猜测)。

考虑一下...

  • p1.send(tx)(偏移量 23)
  • p2.send(tx)(偏移量 24)
  • p1.rollback
  • p2.commit
  • p1.resend(tx)(偏移量 25)。
  • p1.commit.

我的猜测是,p1 在偏移量 23 处的记录被简单地标记为回滚并且没有发送给消费者(除非在它被写入时处于活动状态,并且带有 read_uncommitted 隔离)。

编辑

我发现有/没有交易的偏移量没有区别

@SpringBootApplication
@EnableTransactionManagement
public class So48196671Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext ctx = SpringApplication.run(So48196671Application.class, args);
        Thread.sleep(15_000);
        ctx.close();
        System.exit(0);
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> foo.send("bar");
    }

    @Bean
    public KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> pf) {
        return new KafkaTransactionManager<>(pf);
    }

    @KafkaListener(id = "baz", topics = "so48196671")
    public void listen(String in, @Header(KafkaHeaders.OFFSET) long offset) {
        System.out.println(in + " @ " + offset) ;
    }

    @Component
    public static class Foo {

        @Autowired
        KafkaTemplate<String, String> template;

        @Transactional
        public void send(String out) throws Exception {
            ListenableFuture<SendResult<String, String>> sent = template.send("so48196671", out);
            SendResult<String, String> sendResult = sent.get();
            System.out.println(out + " sent to " + sendResult.getRecordMetadata().offset());
            Thread.sleep(5_000);
        }

    }

}

bar sent to 17
bar @ 17

但是,是的,在失败的情况下,使用了一个额外的插槽......

@SpringBootApplication
@EnableTransactionManagement
public class So48196671Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext ctx = SpringApplication.run(So48196671Application.class, args);
        Thread.sleep(15_000);
        ctx.close();
        System.exit(0);
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            try {
                foo.send("bar");
            }
            catch (Exception e) {
                //
            }
            foo.send("bar");
        };
    }

    @Bean
    public KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> pf) {
        return new KafkaTransactionManager<>(pf);
    }

    @KafkaListener(id = "baz", topics = "so48196671")
    public void listen(String in, @Header(KafkaHeaders.OFFSET) long offset) {
        System.out.println(in + " @ " + offset) ;
    }

    @Component
    public static class Foo {

        private boolean fail = true;

        @Autowired
        KafkaTemplate<String, String> template;

        @Transactional
        public void send(String out) throws Exception {
            ListenableFuture<SendResult<String, String>> sent = template.send("so48196671", out);
            SendResult<String, String> sendResult = sent.get();
            System.out.println(out + " sent to " + sendResult.getRecordMetadata().offset());
            if (fail) {
                fail = false;
                throw new RuntimeException();
            }
        }

    }

}

bar sent to 25
bar sent to 27
bar @ 27

在下一次运行...

bar sent to 29
bar sent to 31
bar @ 31

如果我删除异常,在下一次运行时,我会得到

bar sent to 33
bar sent to 35
bar @ 33
bar @ 35

所以,是的,事务本身似乎在日志中占据了一个位置。

所以这并不是一个重复的消息——你可能可以在他们的设计文档中读到它。

【讨论】:

  • 是的,如果有任何失败并且操作被回滚,这也是如此,但是在正常情况下,它不应该回滚事务并且不应该增加2。
  • 请用示例和日志准确解释您的意思。
  • 对不起,如果我不能提供足够的细节或解释清楚,我正在发送一条没有@TransactionalKafkaTransactionManager 的消息,使用kafkaTemplate.send() 我的偏移量增加了一个跨度>
  • 现在我已将 @Transactional 添加到我的侦听器并再次发送一条消息在这种情况下,偏移量增加了 2,但我没有任何事务回滚或任何类型的失败我理解如果我要发送一条消息,就像您解释的那样 p1.send(tx)(offset 23) 在这种情况下,我应该将偏移量设为 23,但是当我看到偏移量时,它会增加到 24。因为没有任何失败,所以没有回滚应该发生
  • 另外,我观察到我的消费者只收到一条消息。但是,日志偏移量增加了 2,即使我的使用者已启动并正在运行,我总是会看到一条消息的滞后。我在我的消费者中使用 read_committed。
猜你喜欢
  • 1970-01-01
  • 2020-01-18
  • 2016-01-17
  • 1970-01-01
  • 2018-10-27
  • 2020-06-24
  • 1970-01-01
  • 2017-03-05
  • 2023-03-17
相关资源
最近更新 更多