【发布时间】:2021-09-09 22:48:44
【问题描述】:
我正在使用Confluent Kafka .NET 为分区主题创建消费者。
由于 Confluent Kafka .NET 不支持批量消费,我构建了一个函数来消费消息,直到达到批量大小。此函数的想法是仅使用来自同一分区的消息构建批处理,这就是为什么一旦我使用具有不同分区的结果并返回到目前为止我能够使用的任何数量的消息时我停止构建批处理的原因.
目标或目的:我希望能够处理我在批处理中返回的消息,并仅提交这些消息的偏移量。即:
| Message Consumed From Partition | Offset | Stored in Batch |
|---|---|---|
| 0 | 0 | Yes |
| 0 | 1 | Yes |
| 2 | 0 | No |
根据上表,我想处理从分区 0 收到的两条消息。来自分区 2 的消息将被忽略,并且(希望)稍后在对 ConsumeBatch 的另一次调用中拾取。
要提交,我只需调用同步Commit 函数,将我处理的最新消息的偏移量作为参数传递。在这种情况下,我将传递上表中显示的批次的第二条消息的偏移量(分区 0 - 偏移量 1)。
问题:
问题是,由于某种原因,当我像上面显示的那样构建一个批处理时,我决定不处理的消息由于验证而被永远忽略。即:分区 2 的消息 0 将永远不会被消费者再次拾取。
正如您在下面的使用者配置中所见,我将 EnableAutoCommit 和 EnableAutoOffsetStore 都设置为 false。我认为这足以让消费者不对偏移量做任何事情,并且能够在另一个 Consume 调用中接收被忽略的消息,但事实并非如此。无论我的配置如何,偏移量都会以某种方式增加到每个分区的最新消费消息。
如果可能的话,任何人都可以告诉我我在这里缺少什么以实现所需的行为吗?
构建批处理的简化版函数:
public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
{
List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();
int latestPartition = -1; // The partition from where we consumed the last message
for (int i = 0; i < batchSize; i++)
{
var result = _consumer.Consume(100);
if (result != null)
{
if (latestPartition == -1 || result.Partition.Value == latestPartition)
{
consumedMessages.Add(result);
latestPartition = result.Partition.Value;
}
else
break;
}
else
break;
}
return consumedMessages;
}
ConsumerConfig 用于实例化我的消费者客户端:
_consumerConfig = new ConsumerConfig
{
BootstrapServers = _bootstrapServers,
EnableAutoCommit = false,
AutoCommitIntervalMs = 0,
GroupId = "WorkerConsumers",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoOffsetStore = false,
};
其他信息: 正在测试:
- 1 个主题,6 个分区,复制因子为 2
- 3 个经纪人
- 1 个属于消费者组的单线程消费者客户端
- 在 Windows 10 上使用 wsl2 的本地环境
【问题讨论】:
标签: c# .net-core apache-kafka librdkafka confluent-kafka-dotnet