【发布时间】:2021-07-09 21:20:10
【问题描述】:
我正在努力了解卡夫卡。我制作的一个类似于 Hello World 的应用程序是 Producer 不使用消息的密钥。所有文档和教程都说消息应该以循环方式传播给消费者,但我的测试表明并非如此。它使用 Visual Studio 中的默认示例解决方案(带有天气预报)。每次查询 API 时,生成的预测也会发布到 Kafka。
生产者无非如下,在一个API控制器类中(每次调用这个,数组里有5个天气预报):
ProducerConfig config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = $"KafkaResearch-{Dns.GetHostName()}"
};
using (IProducer<string, string> producer = new ProducerBuilder<string, string>(config).Build())
{
foreach (var forecast in weatherForecasts)
{
string value = JsonSerializer.Serialize(forecast);
Message<string, string> message = new Message<string, string> { Value = value };
producer.Produce("weather-forecasts", message, DeliveryHandler);
}
producer.Flush();
}
消息都是不同的。它们每次都包含随机生成的数据:
weatherForecasts = Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
Date = DateTime.Today.AddDays(index),
TemperatureC = rng.Next(-20, 55),
Summary = Summaries[rng.Next(Summaries.Length)]
})
.ToArray();
消费者是一个非常简单的控制台应用程序:
static void Main(string[] args)
{
Console.WriteLine("Press any key to abort");
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Latest
};
using (var consumer = new ConsumerBuilder<string, string>(config).Build())
{
consumer.Subscribe("weather-forecasts");
while (!Console.KeyAvailable)
{
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(500));
if (consumeResult != null)
{
Console.WriteLine($"Message received on partition {consumeResult.Partition}, key: {consumeResult.Message.Key ?? "<--null-->"}");
Console.WriteLine($" {consumeResult.Message.Value}");
}
}
consumer.Close();
}
}
主题由 12 个分区创建:
kafka-topics --bootstrap-server localhost:9092 --create --partitions 12 --topic weather-forecasts
当我使用浏览器向 API 发送垃圾邮件时,我希望分区上的(某种)均匀分布,但远非如此。 12 个分区中只有 4 个得到消息,所以当使用 5 个消费者时,其中一个是空闲的。
如果我在一条消息中以数组的形式发送所有天气预报(因此在生产者中没有foreach),所有消息每次都分配到同一个分区。
根据 cmets 的要求:代码如下所示:
ProducerConfig config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = $"KafkaResearch-{Dns.GetHostName()}"
};
using (IProducer<string, string> producer = new ProducerBuilder<string, string>(config).Build())
{
string value = JsonSerializer.Serialize(weatherForecasts);
Message<string, string> message = new Message<string, string> { Value = value };
producer.Produce("weather-forecasts", message, DeliveryHandler);
producer.Flush();
}
当我在消息中添加一个随机密钥时,所有消费者的分布都非常好。
我尝试在ProducerConfig 中指定Partitioner。这改变了消息分配到哪个分区,但仍然只产生很少的分区(额外问题:为什么分区器是生产者而不是主题的设置?我希望所有生产者都必须使用相同的方法分区...)。
kafka-topics --version
2.7.1 (Commit:61dbce85d0d41457)
使用 Confluent.Kafka Nuget 包(由 Confluent Inc.,Andreas Heider 提供)1.5.3 版
感谢您的任何见解,
基多。
【问题讨论】:
-
该数组包含每次生成的 5 个不同的天气预报。每个天气预报都包含一个随机温度(-20 .. 55 C)和一个随机摘要(十个字符串之一)。所以每条信息都应该不同。但即使没有,消息本身也不会用于分区,是吗?
-
github.com/confluentinc/confluent-kafka-dotnet/issues/1346 - 我怀疑问题是生产者正在正确地进行循环 - 但是你不断地旋转新的生产者。如果你只使用一个生产者,它会起作用吗?
-
@mjwills 将其重构为每次都重用同一个生产者(使用 Core 的 DI 的 AddSingleton)。现在工作。如果您重新创建生产者,显然“随机”是非常确定的......如果您将评论放在答案中,我会接受它。不过,我认为这是制片人的一个错误。出于性能原因,您希望重用生产者而不是每次都创建它,但随机应该是随机的。
-
请注意,“循环”和“随机”非常不同。你从哪里得到的想法是随机的?
-
(bonus question: why is the Partitioner a setting on the Producer instead of the topic?因为 95% 的时间您将使用单例生产者和标准化的分区系统。如果你不想这样做,你可以这样做,但你需要不止一个制作人。
标签: c# .net apache-kafka