【问题标题】:Kafka: Sarama, idempotence and transactional.idKafka:Sarama、幂等性和 transactional.id
【发布时间】:2021-06-10 02:27:42
【问题描述】:
【问题讨论】:
标签:
go
apache-kafka
kafka-producer-api
sarama
【解决方案1】:
Shopify/sarama 为 Kafka Exactly Once(幂等性)提供启用幂等性的生产者。但是对于下面的配置设置需要在那里。
来自Shopify/sarama/config.go
if c.Producer.Idempotent {
if !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
}
if c.Producer.Retry.Max == 0 {
return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
}
if c.Producer.RequiredAcks != WaitForAll {
return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
}
if c.Net.MaxOpenRequests > 1 {
return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
}
}
在Shopify/sarama 他们是如何做到的,在AsyncProducer 的transactionManager 中有一个producerEpoch ID。您可以参考Shopify/sarama/async_producer.go 中的文件。此 ID 使用生产者初始化进行初始化,并在成功生成每条消息时递增。读取bumpEpoch() 函数以在async_producer.go 文件中查看。
这是该生产者与代理的会话的序列 ID,它与每条消息一起发送。消息发布成功时递增。
阅读this example。它描述了幂等性的工作原理。
您在生产者会话事实方面是正确的。对于单个生产者会话,这正是曾经承诺过的。在序列失败后重新启动生产者时,可能存在重复。
当生产者重新启动时,会分配新的 PID。因此,幂等性仅被承诺为单个生产者会话。即使生产者在失败时重试请求,每条消息也会在日志中仅保留一次。根据生产者获取数据的来源,仍然可能存在重复。 Kafka 不会处理生产者收到的重复数据。因此,在某些情况下,您可能需要额外的重复数据删除系统。