上个章节我们讲了kafka的环境安装(这里),现在主要来了解下Kafka使用,基于.net实现kafka的消息队列应用,本文用的是Confluent.Kafka,版本0.11.6
1、安装:
在NuGet程序包中搜索“Confluent.Kafka”下载安装即可
2、producer发送消息:
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using Confluent.Kafka; 5 using Confluent.Kafka.Serialization; 6 7 namespace KafKa 8 { 9 /// <summary> 10 /// Kafka消息生产者 11 /// </summary> 12 public sealed class KafkaProducer 13 { 14 /// <summary> 15 /// 生产消息并发送消息 16 /// </summary> 17 /// <param name="broker">kafka的服务器地址</param> 18 /// <param name="topic">kafka的消息主题名称</param> 19 /// <param name="partion">分区</param> 20 /// <param name="message">需要传送的消息</param> 21 public bool Produce(string broker, string topic, int partion, string message) 22 { 23 bool result = false; 24 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0) 25 { 26 throw new ArgumentNullException("Kafka消息服务器地址不能为空!"); 27 } 28 29 if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0) 30 { 31 throw new ArgumentNullException("消息所属的主题不能为空!"); 32 } 33 34 if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0) 35 { 36 throw new ArgumentNullException("消息内容不能为空!"); 37 } 38 39 var config = new Dictionary<string, object> 40 { 41 { "bootstrap.servers", broker } 42 }; 43 using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) 44 { 45 var deliveryReport = producer.ProduceAsync(topic, null, message, partion); 46 deliveryReport.ContinueWith(task => 47 { 48 if (task.Result.Error.Code == ErrorCode.NoError) 49 { 50 result = true; 51 } 52 //可以在控制台使用以下语句 53 //Console.WriteLine("Producer:" + producer.Name + "\r\nTopic:" + topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value); 54 }); 55 56 producer.Flush(TimeSpan.FromSeconds(10)); 57 } 58 return result; 59 } 60 } 61 }