【问题标题】:Creating Kafka producer in .NET by passing in message通过传入消息在 .NET 中创建 Kafka 生产者
【发布时间】:2018-10-16 21:30:41
【问题描述】:

我想知道研发部门是否有人对使用 .NET 的 Kafka 有任何经验。下面的代码 sn-p 来自客户端,它是用 Java 编写的。似乎 Java 的 Kafka 库比 .NET 的库要丰富得多。我正在尝试做的是在客户端站点的远程服务器上启动一个 Kafka 生产者,以便通过 Kafka 生产者传递 RTA 状态。

我需要做的是在 .NET 中重新创建以下代码,尤其是最后一行“openInterfacesSubscriber.send()”。我正在使用来自 Confluent 的 .NET 包。任何帮助将不胜感激。

Map<String, Object> props = new HashMap<>();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
("xxx.xx.xxx.xxx:9092");
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "ept-oi-log");
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
IntegerDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class); 

DefaultKafkaProducerFactory<Integer, String> producer = new 
DefaultKafkaProducerFactory<>( producerConfigs(props));

KafkaTemplate<Integer, String> openInterfacesSubscriber = new KafkaTemplate<> 
(producer); 

for (all in { "AGENTBYACCOUNTMEASURES", "AGENTBYROUTINGSERVICEMEASURES") {

String subRequest = String.format("   {\"userName\":\"%s\",\"password\":\"%s\",\"subscriptionRequestId\":\"5d09vjfgk\",\"request\":\"SUBSCRIBE\", \"measuresStream\":\"% s\",\"version\":\"3.4\"}", "MikeGrey@odl.lab",  "Avaya123", measureName); 

// THERE IS NO KAFKA SECURITY HERE: USERNAME/PWD ABOVE IS FOR THE subRequest
// STRING ONLY, ANYONE CAN CONNECT TO THIS KAKFA INSTANCE.

openInterfacesSubscriber.send("realtimesubscriptionrequest", 0, i++, subRequest);

【问题讨论】:

    标签: .net apache-kafka producer


    【解决方案1】:

    欢迎来到 Stack Overflow!

    .NET API 仍在开发中,在 Windows 上托管也有点棘手(或者我发现)。

    我一直在使用实验性的 .net Kafka 客户端,发现“映射”到您发布的 java 源代码很容易:

    • 配置图就变成了字典
    • 序列化/反序列化设置可以通过Producer 构造函数的参数在代码中完成。您可能需要更改编码。
    • KafkaTemplate&lt;&gt;.send 映射到 Producer.ProduceAsync

    请记住,客户端仍在不断发展,这是您的代码的一个版本,它适用于我提到的 nuget 版本:

    static void Main(string[] args)
    {          
        // Client: .net core console app 2.0 / Confluent.Kafka nuget 1.0.0-experimental-2
        // Server: Kafka 1.0.0
        Dictionary<string, object> config = new Dictionary<string, object>()
        {
            { "bootstrap.servers", "ept-oi-log" },
            { "group.id", "ept-oi-log" },
            { "enable.auto.commit", true },
            { "session.timeout.ms", 15000 },
            { "client.id", "1" },
        };
    
        Producer(config).Wait();
    }
    
    static async Task Producer(IEnumerable<KeyValuePair<string, object>> kafkaConfig)
    {
        var kafkaTopic = "realtimesubscriptionrequest";
        using (var producer = new Producer<int, string>(kafkaConfig, new IntSerializer(), new StringSerializer(System.Text.Encoding.UTF8)))
        {
            int i = 0;
            foreach (var measureName in new[] { "AGENTBYACCOUNTMEASURES", "AGENTBYROUTINGSERVICEMEASURES" })
            {
                String subRequest = String.Format(@"   {{""userName"":""{0}"",""password"":""{1}"",""subscriptionRequestId"":""5d09vjfgk"",""request"":""SUBSCRIBE"", ""measuresStream"":""{2}"",""version"":""3.4""}}", 
                    "ggghhhh@xxx.lab", "xxxxxxx", measureName);
                await producer.ProduceAsync(kafkaTopic, new Message<int, string>() { Key = i++, Value = subRequest });
            }
        }
    }
    

    【讨论】:

    • 这里不应该说谢谢,但无论如何感谢彼得,感谢您在此问题上的帮助。我在想 ProduceAsync 是正确的使用方法,但我绝对没有将参数列表设置为与您在此处设置的相同。自从我发布这个问题以来,我一直在度假,直到 4 月 23 日我才会回到工作岗位,我很想试试你的建议。如果这对我有用,我会告诉你的。再次感谢您的热心帮助。
    猜你喜欢
    • 2018-01-17
    • 2023-01-21
    • 1970-01-01
    • 2017-01-04
    • 1970-01-01
    • 2018-11-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多