【问题标题】:How to create a single producer?如何创建单个生产者?
【发布时间】:2022-01-07 22:32:26
【问题描述】:

我有一个来自 API 的消息列表,我需要映射这个列表中的元素,将它们发送给生产者并一一读取。 多个用户可以同时发送消息列表。 我需要按顺序映射列表,而不是在一个列表完全完成之前阅读另一个列表。 由于当前结构中的每个请求都会创建一个新的生产者,因此我无法按顺序读取传入的消息。 如何通过创建单个生产者来做到这一点?

public class MailController : BaseController
    {
        [HttpPost("fatura")]
        //[AllowAnonymous]
        public bool fatura([FromBody] List<MailMessage> mailMessages)
        {
            FaturaProducer.Produce(mailMessages);
            return default;
        }
}

public class FaturaProducer : FProducer<MailMessage>
    {
        public static Task Produce(List<MailMessage> data)
        {
           Produce(ConfigurationManager.KafkaSettings.Topics[Topics.FaturaKaydetViaTp].TopicName, data);
           return default;
        }
    }

public class FProducer<T> : ProducerBase2
    {
        private static readonly ObjectSerializer<T> serializer = new ObjectSerializer<T>();
        //public static IProducer<Null, T> producer = new ProducerBuilder<Null, T>(producerConfig).SetValueSerializer(serializer).Build();
        public static async Task<DeliveryResult<Null, T>> Produce(string topic,List<T> data)
        {
            try
            {
                LogManager.Logger.Debug("Producer initiating for {topic}", topic);

                using (var producer = new ProducerBuilder<Null, T>(producerConfig).SetValueSerializer(serializer).Build())
                {
                    LogManager.Logger.Debug("Producer initiated");

                    LogManager.Logger.Debug("Producing async");

                    try
                    {
                        foreach (var item in data)
                        {
                            producer.Poll(TimeSpan.FromSeconds(5));
                            await producer.ProduceAsync(topic, new Message<Null, T> { Value = item });
                        }
                       
                    }
                    catch (ProduceException<Null, string> ex)
                    {
                        LogManager.Logger.Fatal(ex, "Delivery failed: {reason}", ex.Error.Reason);
                        throw;
                    }
                }
            }
            catch (Exception ex)
            {
                LogManager.Logger.Fatal(ex, "Producer failed for {topic}", topic);
            }

            return null;
        }

    }

【问题讨论】:

    标签: c# apache-kafka confluent-kafka-dotnet


    【解决方案1】:

    目前,您正在遍历列表,调用您的 Produce 函数,并为每条消息

    创建一个生产者

    或者,创建一个新的 Produce 方法,接受整个列表,然后用 using (var producer 块包装你的 foreach 循环,这样所有消息都只使用一个生产者,记住你应该不是return 在第一个循环项目上。无论如何,你没有对 DeliveryResult 做任何事情,所以让这个方法无效。

    【讨论】:

    • 我尝试给producer发送一个list,但是每次请求都会创建一个新的producer,如何单挑出来?
    • 我看不出有什么问题。但如果必须,您需要在应用程序的一开始就创建一个生产者对象,作为一个静态单例,并自行管理它,而不是只在需要它的代码部分周围使用 using 语句
    猜你喜欢
    • 1970-01-01
    • 2020-06-09
    • 1970-01-01
    • 2023-01-09
    • 1970-01-01
    • 2019-07-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多