【问题标题】:Kafka: Consume partition with manual batching - Messages are being skippedKafka:使用手动批处理消耗分区 - 正在跳过消息
【发布时间】:2021-09-09 22:48:44
【问题描述】:

我正在使用Confluent Kafka .NET 为分区主题创建消费者。

由于 Confluent Kafka .NET 不支持批量消费,我构建了一个函数来消费消息,直到达到批量大小。此函数的想法是仅使用来自同一分区的消息构建批处理,这就是为什么一旦我使用具有不同分区的结果并返回到目前为止我能够使用的任何数量的消息时我停止构建批处理的原因.

目标或目的:我希望能够处理我在批处理中返回的消息,并仅提交这些消息的偏移量。即:

Message Consumed From Partition Offset Stored in Batch
0 0 Yes
0 1 Yes
2 0 No

根据上表,我想处理从分区 0 收到的两条消息。来自分区 2 的消息将被忽略,并且(希望)稍后在对 ConsumeBatch 的另一次调用中拾取。

要提交,我只需调用同步Commit 函数,将我处理的最新消息的偏移量作为参数传递。在这种情况下,我将传递上表中显示的批次的第二条消息的偏移量(分区 0 - 偏移量 1)。

问题:

问题是,由于某种原因,当我像上面显示的那样构建一个批处理时,我决定不处理的消息由于验证而被永远忽略。即:分区 2 的消息 0 将永远不会被消费者再次拾取。

正如您在下面的使用者配置中所见,我将 EnableAutoCommitEnableAutoOffsetStore 都设置为 false。我认为这足以让消费者不对偏移量做任何事情,并且能够在另一个 Consume 调用中接收被忽略的消息,但事实并非如此。无论我的配置如何,偏移量都会以某种方式增加到每个分区的最新消费消息。

如果可能的话,任何人都可以告诉我我在这里缺少什么以实现所需的行为吗?

构建批处理的简化版函数:

public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
{
    List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();

    int latestPartition = -1; // The partition from where we consumed the last message

    for (int i = 0; i < batchSize; i++)
    {
        var result = _consumer.Consume(100);
        
        if (result != null)
        {
            if (latestPartition == -1 || result.Partition.Value == latestPartition)
            {
                consumedMessages.Add(result);
                latestPartition = result.Partition.Value;
            }
            else
                break;
        }
        else
            break;
    }

    return consumedMessages;
}

ConsumerConfig 用于实例化我的消费者客户端:

_consumerConfig = new ConsumerConfig
        {
            BootstrapServers = _bootstrapServers,
            EnableAutoCommit = false,
            AutoCommitIntervalMs = 0,
            GroupId = "WorkerConsumers",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnableAutoOffsetStore = false,
        };

其他信息: 正在测试:

  • 1 个主题,6 个分区,复制因子为 2
  • 3 个经纪人
  • 1 个属于消费者组的单线程消费者客户端
  • 在 Windows 10 上使用 wsl2 的本地环境

【问题讨论】:

    标签: c# .net-core apache-kafka librdkafka confluent-kafka-dotnet


    【解决方案1】:

    关键是使用Seek 函数将分区的偏移量重置为特定位置,以便可以将忽略的消息作为另一批次的一部分再次拾取。

    在上面的同一个函数中:

    public IEnumerable<ConsumeResult<string, string>> ConsumeBatch(int batchSize)
    {
        List<ConsumeResult<string, string>> consumedMessages = new List<ConsumeResult<string, string>>();
    
        int latestPartition = -1; // The partition from where we consumed the last message
    
        for (int i = 0; i < batchSize; i++)
        {
            var result = _consumer.Consume(100);
        
            if (result != null)
            {
                if (latestPartition == -1 || result.Partition.Value == latestPartition)
                {
                    consumedMessages.Add(result);
                    latestPartition = result.Partition.Value;
                }
                else
                {
                    // This call will guarantee that this message that will not be included in the current batch, will be included in another batch later
                    _consumer.Seek(result.TopicPartitionOffset); // IMPORTANT LINE!!!!!!!
                    break;
                }
            }
            else
                break;
        }
    
        return consumedMessages;
    }
    

    我认为一般来说,如果您想在不以任何方式更改偏移量的情况下使用消息(有点偷看主题分区),您可以调用Consume,然后使用Seek(result.TopicPartitionOffset) 设置该主题分区的偏移量回到使用消息之前的位置。

    【讨论】:

      猜你喜欢
      • 2021-02-01
      • 2019-12-15
      • 2016-04-18
      • 1970-01-01
      • 2018-11-21
      • 2019-08-04
      • 2017-08-11
      • 1970-01-01
      • 2020-05-09
      相关资源
      最近更新 更多