【问题标题】:Why is Kafka distribution over partitions far from even when no key is used?为什么即使没有使用密钥,Kafka 也不会在分区上分布?
【发布时间】:2021-07-09 21:20:10
【问题描述】:

我正在努力了解卡夫卡。我制作的一个类似于 Hello World 的应用程序是 Producer 不使用消息的密钥。所有文档和教程都说消息应该以循环方式传播给消费者,但我的测试表明并非如此。它使用 Visual Studio 中的默认示例解决方案(带有天气预报)。每次查询 API 时,生成的预测也会发布到 Kafka。

生产者无非如下,在一个API控制器类中(每次调用这个,数组里有5个天气预报):

ProducerConfig config = new ProducerConfig
{
    BootstrapServers = "localhost:9092",
    ClientId = $"KafkaResearch-{Dns.GetHostName()}"
};

using (IProducer<string, string> producer = new ProducerBuilder<string, string>(config).Build())
{
    foreach (var forecast in weatherForecasts)
    {
        string value = JsonSerializer.Serialize(forecast);
        Message<string, string> message = new Message<string, string> { Value = value };
    
        producer.Produce("weather-forecasts", message, DeliveryHandler);
    }
    producer.Flush();
}

消息都是不同的。它们每次都包含随机生成的数据:

weatherForecasts = Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
    Date = DateTime.Today.AddDays(index),
    TemperatureC = rng.Next(-20, 55),
    Summary = Summaries[rng.Next(Summaries.Length)]
})
.ToArray();

消费者是一个非常简单的控制台应用程序:

static void Main(string[] args)
{
    Console.WriteLine("Press any key to abort");

    var config = new ConsumerConfig
    {
        BootstrapServers = "localhost:9092",
        GroupId = "foo",
        AutoOffsetReset = AutoOffsetReset.Latest                
    };

    using (var consumer = new ConsumerBuilder<string, string>(config).Build())
    {
        consumer.Subscribe("weather-forecasts");

        while (!Console.KeyAvailable)
        {
            var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));

            if (consumeResult != null)
            {
                Console.WriteLine($"Message received on partition {consumeResult.Partition}, key: {consumeResult.Message.Key ?? "<--null-->"}");
                Console.WriteLine($"  {consumeResult.Message.Value}");
            }
        }

        consumer.Close();
    }
}

主题由 12 个分区创建:

kafka-topics --bootstrap-server localhost:9092  --create --partitions 12 --topic weather-forecasts
 

当我使用浏览器向 API 发送垃圾邮件时,我希望分区上的(某种)均匀分布,但远非如此。 12 个分区中只有 4 个得到消息,所以当使用 5 个消费者时,其中一个是空闲的。

如果我在一条消息中以数组的形式发送所有天气预报(因此在生产者中没有foreach),所有消息每次都分配到同一个分区。

根据 cmets 的要求:代码如下所示:

ProducerConfig config = new ProducerConfig
{
    BootstrapServers = "localhost:9092",
    ClientId = $"KafkaResearch-{Dns.GetHostName()}"
};

using (IProducer<string, string> producer = new ProducerBuilder<string, string>(config).Build())
{
    string value = JsonSerializer.Serialize(weatherForecasts);

    Message<string, string> message = new Message<string, string> { Value = value };
    producer.Produce("weather-forecasts", message, DeliveryHandler);
    producer.Flush();
}

当我在消息中添加一个随机密钥时,所有消费者的分布都非常好。

我尝试在ProducerConfig 中指定Partitioner。这改变了消息分配到哪个分区,但仍然只产生很少的分区(额外问题:为什么分区器是生产者而不是主题的设置?我希望所有生产者都必须使用相同的方法分区...)。

kafka-topics --version
2.7.1 (Commit:61dbce85d0d41457)

使用 Confluent.Kafka Nuget 包(由 Confluent Inc.,Andreas Heider 提供)1.5.3 版

感谢您的任何见解,

基多。

【问题讨论】:

  • 该数组包含每次生成的 5 个不同的天气预报。每个天气预报都包含一个随机温度(-20 .. 55 C)和一个随机摘要(十个字符串之一)。所以每条信息都应该不同。但即使没有,消息本身也不会用于分区,是吗?
  • github.com/confluentinc/confluent-kafka-dotnet/issues/1346 - 我怀疑问题是生产者正在正确地进行循环 - 但是你不断地旋转新的生产者。如果你只使用一个生产者,它会起作用吗?
  • @mjwills 将其重构为每次都重用同一个生产者(使用 Core 的 DI 的 AddSingleton)。现在工作。如果您重新创建生产者,显然“随机”是非常确定的......如果您将评论放在答案中,我会接受它。不过,我认为这是制片人的一个错误。出于性能原因,您希望重用生产者而不是每次都创建它,但随机应该是随机的。
  • 请注意,“循环”和“随机”非常不同。你从哪里得到的想法是随机的
  • (bonus question: why is the Partitioner a setting on the Producer instead of the topic? 因为 95% 的时间您将使用单例生产者和标准化的分区系统。如果你不想这样做,你可以这样做,但你需要不止一个制作人。

标签: c# .net apache-kafka


【解决方案1】:

问题是Producer 进行循环工作,而您没有重复使用相同的Producer(您保持new 使用多个)。

所以你需要使用Producer 作为单例来使循环逻辑有用。另见https://github.com/confluentinc/confluent-kafka-dotnet/issues/1346

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-12-12
    • 1970-01-01
    • 2012-04-12
    • 2019-06-26
    • 2020-09-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多