【问题标题】:Kafka transactional producer writes messages even if producer commit in not invoked即使未调用生产者提交,Kafka 事务性生产者也会写入消息
【发布时间】:2020-12-26 21:36:32
【问题描述】:

对我来说,kafka 事务生产者的行为似乎与普通生产者一样,消息在主题上可见,因为每条消息都调用了 send。也许我错过了一些基本的东西。我希望消息仅在调用生产者提交方法后才会出现在主题中。在我下面的代码中,produce.commitTransactions() 被注释掉了,但我仍然收到主题中的消息。感谢您的任何指点。

  public static void main(String[] args) {
        try {
            Properties producerConfig = new Properties();
            producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
            producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer-1");
            producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // enable idempotence
            producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id-1"); // set transaction id
            producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

            Producer<String, String> producer = new KafkaProducer<>(producerConfig);

            producer.initTransactions(); //initiate transactions
            try {
                producer.beginTransaction(); //begin transactions
                for (Integer i = 0; i < 1000; i++) {
                    producer.send(new ProducerRecord<String, String>("t_test", i.toString(), "value_" + i));

                }
                // producer.commitTransaction(); //commit

            } catch (KafkaException e) {
                // For all other exceptions, just abort the transaction and try again.
                producer.abortTransaction();
            }

            producer.close();
        } catch (Exception e) {
            System.out.println(e.toString());
        }
    }

【问题讨论】:

    标签: apache-kafka transactions kafka-consumer-api kafka-producer-api


    【解决方案1】:

    当谈到 Kafka 中的事务时,您需要考虑一对生产者/消费者。正如您所观察到的,生产者本身只是生产数据并提交或不提交事务。

    只有在与消费者交互时,您才能通过将 KafkaConsumer 配置 isolation.level 设置为 read_committed(默认设置为 read_uncommitted)来“完成”交易。此配置描述为:

    isolation.level:控制如何读取以事务方式编写的消息。如果设置为 read_committed,consumer.poll() 将只返回已提交的事务消息。如果设置为 read_uncommitted'(默认值),consumer.poll() 将返回所有消息,甚至是已中止的事务消息。在任一模式下,非事务性消息都将无条件返回。

    【讨论】:

      猜你喜欢
      • 2018-02-08
      • 2019-01-15
      • 1970-01-01
      • 2020-08-24
      • 2020-02-09
      • 1970-01-01
      • 2021-03-14
      • 2020-01-19
      • 2018-04-11
      相关资源
      最近更新 更多