【问题标题】:Reactor Kafka: Exactly Once Processing SampleReactor Kafka:恰好一次处理样本
【发布时间】:2019-08-01 08:34:17
【问题描述】:

我读过很多文章,其中有很多不同的配置可以实现一次处理。

这是我的生产者配置:

final Map<String, Object> props = Maps.newConcurrentMap();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);


props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all"); 
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-tx-1");

这是我的消费者配置:

final Map<String, Object> props = Maps.newHashMap();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

我读过这个[示例场景][1]

我尝试关注,但遇到了一些问题:

这是我的生产者代码:

    @Override
public Mono<SenderResult<Void>> buy(Message msg) {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    return kafkaProducerTemplate.transactionManager().begin().then(kafkaProducerTemplate.send(mytopic, msg));

}

我的消费者代码:

@Override
public void run(ApplicationArguments arg0) throws Exception {
    final ReactiveKafkaProducerTemplate kafkaProducerTemplate = kafkaConfig.getKafkaProducerTemplate();
    final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate = kafkaConfig.getKafkaConsumerTemplate(mytopic, Message.class);

    final Flux<ConsumerRecord<String, Message>> flux = kafkaConsumerTemplate.receiveExactlyOnce(kafkaProducerTemplate.transactionManager())
            .concatMap(receiverRecordFlux -> receiverRecordFlux );

    flux.subscribe(record -> {
        final Message message = record.value();

        System.out.printf("received message: timestamp=%s key=%d value=%s\n",
                dateFormat.format(new Date(record.timestamp())),
                record.key(),
                message);
 transactionService.processAndSendToNextTopic(message)
                .doOnSuccess(aVoid -> kafkaProducerTemplate.transactionManager().commit())
                .subscribe();

    });
}

我在尝试生成和使用消息时总是遇到以下错误:

Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION

【问题讨论】:

    标签: apache-kafka spring-kafka project-reactor reactive-kafka


    【解决方案1】:

    查看receiveExactlyOnce的javadocs

    /**
     * Returns a {@link Flux} of consumer record batches that may be used for exactly once
     * delivery semantics. A new transaction is started for each inner Flux and it is the
     * responsibility of the consuming application to commit or abort the transaction
     * using {@link TransactionManager#commit()} or {@link TransactionManager#abort()}
     * after processing the Flux. 
    

    begin() 已经被调用,所以你不需要调用它。

    @Override
    public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager transactionManager) {
        this.ackMode = AckMode.EXACTLY_ONCE;
        Flux<ConsumerRecords<K, V>> flux = withDoOnRequest(createConsumerFlux());
        return  flux.map(consumerRecords -> transactionManager.begin()
                                 .then(Mono.fromCallable(() -> awaitingTransaction.getAndSet(true)))
                                 .thenMany(transactionalRecords(transactionManager, consumerRecords)))
                                 .publishOn(transactionManager.scheduler());
    }
    

    【讨论】:

    • 谢谢!你看到消费者/生产者配置有什么问题吗?另外,发送后我需要关闭生产者吗?
    • 我不熟悉反应式客户端(或反应器,真的),但根据 javadocs,在当前记录完成之前,您不会获得下一个消费者记录流,因此重用制片人应该没问题。我确实注意到的一件事是,如果您有多个分区和多个实例,那么适当的僵尸防护将不起作用,因为要使其正常工作,transactional.id 必须基于topic/partition/group.id。因此,如果一个分区在重新平衡后移动到另一个实例,那么新的消费者使用相同的事务 ID。每个分区都需要一个消费者。
    • 根据我的阅读,“我们通过要求为每个事务生产者分配一个称为 transactional.id 的唯一标识符来解决僵尸实例的问题。这用于在流程重新启动时识别相同的生产者实例。”。我以为我只需要为每个运行生产者的实例设置一个唯一的 id
    • 这对于仅生产者事务来说是正确的,但对于读-进程-写来说却不是一次。明天我会寻找解释这一点的文章。
    • Here is one discussion。但是我现在找不到一篇 Confluent 文章。
    猜你喜欢
    • 1970-01-01
    • 2017-01-14
    • 2020-10-04
    • 2020-05-12
    • 2022-09-23
    • 1970-01-01
    • 2020-05-04
    • 2012-11-14
    • 2018-07-15
    相关资源
    最近更新 更多