【问题标题】:How to programatically create topic and send message to Kafka using C# dot net client如何使用 C# dot net 客户端以编程方式创建主题并将消息发送到 Kafka
【发布时间】:2020-05-18 18:13:49
【问题描述】:

我是 kafka 的新手,我想尝试创建主题并从我的 .net 应用程序向 kafka 发送消息。 我正在使用 kafka.net dll 并成功创建主题 使用此代码:

        Uri uri = new Uri("http://localhost:9092");

        string topic = "testkafka";

        string payload = "test msg";

        var sendMsg = new Thread(() =>

        {

            KafkaNet.Protocol.Message msg = new KafkaNet.Protocol.Message(payload);

            var options = new KafkaOptions(uri);

            var router = new KafkaNet.BrokerRouter(options);

            var client = new Producer(router);

            client.SendMessageAsync(topic, new List<KafkaNet.Protocol.Message> { msg }).Wait();

        });

        sendMsg.Start();

但我看不到任何消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testkafka --from-beginning

谁能帮我举个例子? 谢谢。

【问题讨论】:

    标签: c# .net apache-kafka kafka-consumer-api kafka-topic


    【解决方案1】:

    对于这两个操作,您都可以使用cofluent-kafka-dotnet 客户端。


    为了以编程方式create a topic:

    static async Task CreateTopicAsync(string bootstrapServers, string topicName) {
    
        using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) {
            try {
                await adminClient.CreateTopicsAsync(new TopicSpecification[] { 
                        new TopicSpecification { Name = 'myTopicName', ReplicationFactor = 1, NumPartitions = 1 } });
                } 
                catch (CreateTopicsException e) {
                    Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
                }
            }
        }
    

    为了产生消息:

    using System;
    using System.Threading.Tasks;
    using Confluent.Kafka;
    
    class Program
    {
        public static async Task Main(string[] args)
        {
            var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
    
            // If serializers are not specified, default serializers from
            // `Confluent.Kafka.Serializers` will be automatically used where
            // available. Note: by default strings are encoded as UTF8.
            using (var p = new ProducerBuilder<Null, string>(config).Build())
            {
                try
                {
                    var dr = await p.ProduceAsync("myTopicName", new Message<Null, string> { Value="test" });
                    Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
                }
                catch (ProduceException<Null, string> e)
                {
                    Console.WriteLine($"Delivery failed: {e.Error.Reason}");
                }
            }
        }
    }
    

    【讨论】:

    • 谢谢。有没有 cofluent 的建议?
    • @maya 为什么没有融合?这只是一个 nuget。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-10-30
    • 1970-01-01
    • 2023-03-16
    • 2014-11-19
    • 2018-10-11
    • 2010-12-11
    相关资源
    最近更新 更多