【问题标题】:Exactly once semantic with spring kafka与 spring kafka 恰好一次语义
【发布时间】:2020-10-04 13:48:45
【问题描述】:

我正在尝试测试我的一次性配置,以确保我设置的所有配置都是正确的,并且行为符合我的预期

我似乎遇到了重复发送的问题

    public static void main(String[] args) {
    MessageProducer producer = new ProducerBuilder()
            .setBootstrapServers("kafka:9992")
            .setKeySerializerClass(StringSerializer.class)
            .setValueSerializerClass(StringSerializer.class)
            .setProducerEnableIdempotence(true).build();
    MessageConsumer consumer = new ConsumerBuilder()
            .setBootstrapServers("kafka:9992")
            .setIsolationLevel("read_committed")
            .setTopics("someTopic2")
            .setGroupId("bla")
            .setKeyDeserializerClass(StringDeserializer.class)
            .setValueDeserializerClass(MapDeserializer.class)
            .setConsumerMessageLogic(new ConsumerMessageLogic() {
                @Override
                public void onMessage(ConsumerRecord cr, Acknowledgment acknowledgment) {
                    producer.sendMessage(new TopicPartition("someTopic2", cr.partition()),
                            new OffsetAndMetadata(cr.offset() + 1),"something1", "im in transaction", cr.key());
                    acknowledgment.acknowledge();
                }
            }).build();
    consumer.start();
}

这是我的“测试”,您可以假设构建器放置了正确的配置。

ConsumerMessageLogic 是一个类,它处理恰好一次语义所支持的读取-过程-写入的“过程”部分

在生产者类中,我有一个发送消息方法,如下所示:

    public void sendMessage(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata,String sendToTopic, V message, PK partitionKey) {
    try {
        KafkaRecord<PK, V> partitionAndMessagePair = producerMessageLogic.prepareMessage(topicPartition.topic(), partitionKey, message);
        if(kafkaTemplate.getProducerFactory().transactionCapable()){
            kafkaTemplate.executeInTransaction(operations -> {
                sendMessage(message, partitionKey, sendToTopic, partitionAndMessagePair, operations);
                operations.sendOffsetsToTransaction(
                        Map.of(topicPartition, offsetAndMetadata),"bla");
                return true;
            });

        }else{
            sendMessage(message, partitionKey, topicPartition.topic(), partitionAndMessagePair, kafkaTemplate);
        }
    }catch (Exception e){
        failureHandler.onFailure(partitionKey, message, e);
    }
}

我这样创建我的消费者:

    /**
 * Start the message consumer
 * The record event will be delegate on the onMessage()
 */
public void start() {
    initConsumerMessageListenerContainer();
    container.start();
}

/**
 * Initialize the kafka message listener
 */
private void initConsumerMessageListenerContainer() {
    // start a acknowledge message listener to allow the manual commit
    messageListener = consumerMessageLogic::onMessage;

    // start and initialize the consumer container
    container = initContainer(messageListener);

    // sets the number of consumers, the topic partitions will be divided by the consumers
    container.setConcurrency(springConcurrency);
    springContainerPollTimeoutOpt.ifPresent(p -> container.getContainerProperties().setPollTimeout(p));
    if (springAckMode != null) {
        container.getContainerProperties().setAckMode(springAckMode);
    }
}

private ConcurrentMessageListenerContainer<PK, V> initContainer(AcknowledgingMessageListener<PK, V> messageListener) {
    return new ConcurrentMessageListenerContainer<>(
            consumerFactory(props),
            containerProperties(messageListener));
}

当我创建我的生产者时,我使用 UUID 作为事务前缀来创建它,就像这样

public ProducerFactory<PK, V> producerFactory(boolean isTransactional) {
    ProducerFactory<PK, V> res = new DefaultKafkaProducerFactory<>(props);
    if(isTransactional){
        ((DefaultKafkaProducerFactory<PK, V>) res).setTransactionIdPrefix(UUID.randomUUID().toString());
        ((DefaultKafkaProducerFactory<PK, V>) res).setProducerPerConsumerPartition(true);
    }
    return res;
}

现在一切都设置好后,我在一个有 2 个分区的主题上创建了 2 个实例 每个实例从消费主题中获得 1 个分区。

我发送一条消息并在调试中等待事务超时(模拟连接丢失) 在实例A中,一旦超时通过另一个实例(实例B)自动处理记录并将其发送到目标主题导致发生重新平衡

到目前为止一切顺利。 现在,当我在实例 A 上释放断点时,它表示它正在重新平衡并且无法提交,但我仍然在目标主题中看到另一条输出记录。

我的期望是,一旦我释放断点,实例 A 将不会继续其工作,因为记录已被处理。

我做错了吗? 这个场景可以实现吗?

编辑 2:

在 garys 关于执行中事务的评论之后,如果我将其中一个实例冻结到超时并在另一个实例处理记录后释放它,那么我会得到重复的记录,然后冻结的实例处理并生成相同的记录到输出主题...

 public static void main(String[] args) {
    MessageProducer producer = new ProducerBuilder()
            .setBootstrapServers("kafka:9992")
            .setKeySerializerClass(StringSerializer.class)
            .setValueSerializerClass(StringSerializer.class)
            .setProducerEnableIdempotence(true).build();


        MessageConsumer consumer = new ConsumerBuilder()
                .setBootstrapServers("kafka:9992")
                .setIsolationLevel("read_committed")
                .setTopics("someTopic2")
                .setGroupId("bla")
                .setKeyDeserializerClass(StringDeserializer.class)
                .setValueDeserializerClass(MapDeserializer.class)
                .setConsumerMessageLogic(new ConsumerMessageLogic() {
                    @Override
                    public void onMessage(ConsumerRecord cr, Acknowledgment acknowledgment) {
                        producer.sendMessage("something1", "im in transaction");
                    }
                }).build();
        consumer.start(producer.getProducerFactory());
}

生产者中没有 executeInTransaction 的新 sendMessage 方法

public void sendMessage(V message, PK partitionKey, String topicName) {

    try {
        KafkaRecord<PK, V> partitionAndMessagePair = producerMessageLogic.prepareMessage(topicName, partitionKey, message);
        sendMessage(message, partitionKey, topicName, partitionAndMessagePair, kafkaTemplate);
    }catch (Exception e){
        failureHandler.onFailure(partitionKey, message, e);
    }
}

以及我将消费者容器创建更改为具有与建议的相同生产者工厂的事务管理器

/**
 * Initialize the kafka message listener
 */
private void initConsumerMessageListenerContainer(ProducerFactory<PK,V> producerFactory) {
    // start a acknowledge message listener to allow the manual commit
    acknowledgingMessageListener = consumerMessageLogic::onMessage;

    // start and initialize the consumer container
    container = initContainer(acknowledgingMessageListener, producerFactory);

    // sets the number of consumers, the topic partitions will be divided by the consumers
    container.setConcurrency(springConcurrency);
    springContainerPollTimeoutOpt.ifPresent(p -> container.getContainerProperties().setPollTimeout(p));
    if (springAckMode != null) {
        container.getContainerProperties().setAckMode(springAckMode);
    }
}

private ConcurrentMessageListenerContainer<PK, V> initContainer(AcknowledgingMessageListener<PK, V> messageListener, ProducerFactory<PK,V> producerFactory) {
    return new ConcurrentMessageListenerContainer<>(
            consumerFactory(props),
            containerProperties(messageListener, producerFactory));
}

 @NonNull
private ContainerProperties containerProperties(MessageListener<PK, V> messageListener, ProducerFactory<PK,V> producerFactory) {
    ContainerProperties containerProperties = new ContainerProperties(topics);
    containerProperties.setMessageListener(messageListener);
    containerProperties.setTransactionManager(new KafkaTransactionManager<>(producerFactory));
    return containerProperties;
}

我的期望是代理一旦从冻结的实例接收到已处理的记录,它就会知道该记录已经由另一个实例处理,因为它包含完全相同的元数据(或者是吗?我的意思是 PID会不一样,但应该不一样吗?)

也许我正在寻找的场景在当前甚至不支持 kafka 和 spring 提供的支持...

如果我有 2 个读-进程-写实例 - 这意味着我有 2 个具有 2 个不同 PID 的生产者。

现在,当我冻结其中一个实例时,当未冻结的实例由于重新平衡而获得记录进程责任时,它将使用自己的 PID 和元数据中的序列发送记录。

现在,当我释放冻结的实例时,他发送了相同的记录,但有自己的 PID,所以经纪人不可能知道它是重复的......

我错了吗?我怎样才能避免这种情况?我虽然重新平衡停止了实例并且不让它完成它的过程(他产生重复记录的地方)因为他不再对该记录负责

添加日志: 冻结实例:您可以在 10:53:34 看到冻结时间,我在 10:54:02 释放它(重新平衡时间为 10 秒)

2020-06-16 10:53:34,393 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
Created new Producer: CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906]
2020-06-16 10:53:34,394 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906] 
beginTransaction()
2020-06-16 10:53:34,395 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.doBegin:149] Created 
Kafka transaction on producer [CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906]]
2020-06-16 10:54:02,157 INFO  [${sys:spring.application.name}] [kafka- 
coordinator-heartbeat-thread | bla] 
[o.a.k.c.c.i.AbstractCoordinator.:] [Consumer clientId=consumer-bla-1,      
groupId=bla] Group coordinator X.X.X.X:9992 (id: 2147482646 rack: 
null) is unavailable or invalid, will attempt rediscovery
2020-06-16 10:54:02,181 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Sending offsets to transaction: {someTopic2- 
0=OffsetAndMetadata{offset=23, leaderEpoch=null, metadata=''}}
2020-06-16 10:54:02,189 INFO  [${sys:spring.application.name}] [kafka- 
producer-network-thread | producer-b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0] [i.i.k.s.p.SimpleSuccessHandler.:] Sent 
message=[im in transaction] with offset=[252] to topic something1
2020-06-16 10:54:02,193 INFO  [${sys:spring.application.name}] [kafka- 
producer-network-thread | producer-b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0] [o.a.k.c.p.i.TransactionManager.:] 
[Producer clientId=producer-b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0, transactionalId=b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0] Discovered group coordinator 
X.X.X.X:9992 (id: 1001 rack: null)
2020-06-16 10:54:02,263 INFO  [${sys:spring.application.name}] [kafka- 
coordinator-heartbeat-thread | bla] 
[o.a.k.c.c.i.AbstractCoordinator.:] [Consumer clientId=consumer-bla-1, 
groupId=bla] Discovered group coordinator 192.168.144.1:9992 (id: 
2147482646 rack: null)
2020-06-16 10:54:02,295 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.processCommit:740] 
Initiating transaction commit
2020-06-16 10:54:02,296 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906] 
commitTransaction()
2020-06-16 10:54:02,299 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Commit list: {}
2020-06-16 10:54:02,301 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.a.k.c.c.i.AbstractCoordinator.:] [Consumer 
clientId=consumer-bla-1, groupId=bla] Attempt to heartbeat failed for 
since member id consumer-bla-1-b3ad1c09-ad06-4bc4-a891-47a2288a830f is 
not valid.
2020-06-16 10:54:02,302 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.a.k.c.c.i.ConsumerCoordinator.:] [Consumer 
clientId=consumer-bla-1, groupId=bla] Giving away all assigned 
partitions as lost since generation has been reset,indicating that 
consumer is no longer part of the group
2020-06-16 10:54:02,302 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.a.k.c.c.i.ConsumerCoordinator.:] [Consumer 
clientId=consumer-bla-1, groupId=bla] Lost previously assigned 
partitions someTopic2-0
2020-06-16 10:54:02,302 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.l.ConcurrentMessageListenerContainer.info:279] 
bla: partitions lost: [someTopic2-0]
2020-06-16 10:54:02,303 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.l.ConcurrentMessageListenerContainer.info:279] 
bla: partitions revoked: [someTopic2-0]
2020-06-16 10:54:02,303 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Commit list: {}

接管分区并在重新平衡后产生记录的常规实例

2020-06-16 10:53:46,536 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
Created new Producer: CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153]
2020-06-16 10:53:46,537 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153] 
beginTransaction()
2020-06-16 10:53:46,539 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.doBegin:149] Created 
Kafka transaction on producer [CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153]]
2020-06-16 10:53:46,556 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Sending offsets to transaction: {someTopic2- 
0=OffsetAndMetadata{offset=23, leaderEpoch=null, metadata=''}}
2020-06-16 10:53:46,563 INFO  [${sys:spring.application.name}] [kafka- 
producer-network-thread | producer-1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0] [i.i.k.s.p.SimpleSuccessHandler.:] Sent 
message=[im in transaction] with offset=[250] to topic something1
2020-06-16 10:53:46,566 INFO  [${sys:spring.application.name}] [kafka-        
producer-network-thread | producer-1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0] [o.a.k.c.p.i.TransactionManager.:] 
[Producer clientId=producer-1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0, transactionalId=1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0] Discovered group coordinator 
X.X.X.X:9992 (id: 1001 rack: null)
2020-06-16 10:53:46,668 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.processCommit:740] 
Initiating transaction commit
2020-06-16 10:53:46,669 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153] 
commitTransaction()
2020-06-16 10:53:46,672 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Commit list: {}
2020-06-16 10:53:51,673 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Received: 0 records

我注意到他们都记录了完全相同的提交偏移量

Sending offsets to transaction: {someTopic2-0=OffsetAndMetadata{offset=23, leaderEpoch=null, metadata=''}}

我认为当他们尝试提交完全相同的事情时,经纪人会中止其中一项交易......

我还注意到,如果我将 transaction.timeout.ms 减少到仅 2 秒,无论我在调试时冻结实例多长时间,它都不会中止事务...

也许 transaction.timeout.ms 的计时器仅在我发送消息后才开始?

【问题讨论】:

    标签: apache-kafka spring-kafka exactly-once


    【解决方案1】:

    您根本不能使用executeInTransaction - 请参阅它的Javadocs;当没有活动事务或您明确不希望操作参与现有事务时使用它。

    你需要在监听容器中添加一个KafkaTransactionManager;它必须引用与模板相同的ProducerFactory

    然后,容器将启动事务,如果成功,将偏移量发送给事务。

    【讨论】:

    • 嗨,加里!感谢您的回复!我尝试按照您的建议进行操作,但是当我删除了 executeInTrasanction 时。我确实设法让消息通过,但是如果我在调试时冻结我的实例 B 直到超时,实例 A 接管记录并将其发送到输出主题,当我释放断点实例 B 进程并再次发送它时。 ..我的输出主题中有一个重复...我将使用新代码编辑问题
    • ((DefaultKafkaProducerFactory&lt;PK, V&gt;) res).setTransactionIdPrefix(UUID.randomUUID().toString()); - 生产者必须具有相同的transactionIdPrefix 才能使生产者围栏工作 - 即 Kafka 使用它来确定旧的生产者是否被围栏。使用 Kafka 2.5,这不再是必需的。见EOSMode.BETA
    • 我现在设法完成了这项工作,但我还有 2 个最后的问题。 1.我想使用BETA版本,但是maven仓库里没有2.6版本,是不是以后的版本? 2.一旦一个实例变成了一个僵尸,它就会一直运行,我的kubernetes不会知道他已经关闭并且不会重新启动它,负载会慢慢地落在越来越少的实例上,是否有最佳实践或一些配置来告诉僵尸生产者重新启动 ?再次感谢!
    • 1.是的,它是future release;只要您的经纪人是 2.5,您就可以使用 2.5 的 BETA。 2、自动恢复;如果这不是您所看到的,请提出一个新问题。此处不再评论。
    猜你喜欢
    • 2020-05-12
    • 2020-05-04
    • 2019-08-25
    • 1970-01-01
    • 2017-01-14
    • 2017-01-21
    • 1970-01-01
    • 2019-08-20
    • 2019-02-27
    相关资源
    最近更新 更多