【发布时间】:2018-10-08 10:50:52
【问题描述】:
confluent kafka 文档说,一个 Consumer 类的定义如下:
Class Consumer<TKey, TValue>
上面的消费者类实现了一个高级的 Apache Kafka 消费者(具有键和值反序列化)。
我了解 TKey 和 TValue 用于反序列化从生产者发送的密钥。例如,像
从生产者那里发送一个密钥看起来像
var deliveryReport = producer.ProduceAsync(topicName, key, val);
在消费者端接收字符串键看起来像
using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
consumer.Subscribe(topics);
Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");
var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
while (!cancelled)
{
Message<Ignore, string> msg;
if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100)))
{
continue;
}
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
}
}
因为我们传入一个key,所以Consumer被初始化为
Consumer<Ignore, string>
消息被初始化为
Message<Ignore, String>
毕竟,我的问题是,密钥的反序列化真正意味着什么?为什么我们需要这样做?还有,为什么我们需要传入一个键值对Ignore, String来进行反序列化?
【问题讨论】:
标签: c# apache-kafka kafka-consumer-api confluent-platform