上个章节我们讲了kafka的环境安装(这里),现在主要来了解下Kafka使用,基于.net实现kafka的消息队列应用,本文用的是Confluent.Kafka,版本0.11.6

1、安装:

在NuGet程序包中搜索“Confluent.Kafka”下载安装即可

 

2、producer发送消息:

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Text;
 4 using Confluent.Kafka;
 5 using Confluent.Kafka.Serialization;
 6 
 7 namespace KafKa
 8 {
 9     /// <summary>
10     /// Kafka消息生产者
11     /// </summary>
12     public sealed class KafkaProducer
13     {
14         /// <summary>
15         /// 生产消息并发送消息
16         /// </summary>
17         /// <param name="broker">kafka的服务器地址</param>
18         /// <param name="topic">kafka的消息主题名称</param>
19         /// <param name="partion">分区</param>
20         /// <param name="message">需要传送的消息</param>
21         public bool Produce(string broker, string topic, int partion, string message)
22         {
23             bool result = false;
24             if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
25             {
26                 throw new ArgumentNullException("Kafka消息服务器地址不能为空!");
27             }
28 
29             if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
30             {
31                 throw new ArgumentNullException("消息所属的主题不能为空!");
32             }
33 
34             if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0)
35             {
36                 throw new ArgumentNullException("消息内容不能为空!");
37             }
38 
39             var config = new Dictionary<string, object>
40             {
41                 { "bootstrap.servers", broker }
42             };
43             using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
44             {
45                 var deliveryReport = producer.ProduceAsync(topic, null, message, partion);
46                 deliveryReport.ContinueWith(task =>
47                 {
48                     if (task.Result.Error.Code == ErrorCode.NoError)
49                     {
50                         result = true;
51                     }
52                     //可以在控制台使用以下语句
53                     //Console.WriteLine("Producer:" + producer.Name + "\r\nTopic:" + topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value);
54                 });
55 
56                 producer.Flush(TimeSpan.FromSeconds(10));
57             }
58             return result;
59         }
60     }
61 }
View Code

相关文章:

  • 2021-06-09
  • 2021-06-04
  • 2021-11-07
  • 2021-07-14
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2021-08-17
  • 2022-02-08
  • 2021-04-23
  • 2022-12-23
  • 2022-12-23
  • 2021-06-18
  • 2021-05-29
相关资源
相似解决方案