【问题标题】:Masstransit, return the consumed messageMasstransit,返回消费消息
【发布时间】:2020-05-14 18:00:31
【问题描述】:

我是 Masstransit 的新手,我不确定如何配置它。 有没有办法将消费的消息返回到我调用 Consume 方法的地方? Consume 方法返回 IBusControl 但我无法获取消息。

实际的 Consume 方法是通过 ReceiveEndpoint 以某种方式调用的,并且此 Consume 方法返回一个保存消息的 Task。所以我的问题是,如何将此消息返回到 ConsumeMessage 方法,以便我可以使用结果填充 InventoryItemObject。

public IInventory ConsumeMessage(string domainId)
    {
        using (consumer = new MassTransitConsumer(MassTransitConsumer.GetConfiguration()))
        {
            try
            {
                var message = consumer.Consume(domainId).StartAsync().Result;
            }
            catch (Exception ex)
            {
                throw new Exception(ex.Message);
            }
        }

        return new InventoryItemObject();
    }

        public IBusControl Consume(string queue)
    {

        var endpoint = new Uri(string.Concat(configuration.HostAddress, "/", queue));

        try
        {
            if (configuration.QueueType == "RabbitMQ" || string.IsNullOrEmpty(configuration.QueueType))
            {
                bus = Bus.Factory.CreateUsingRabbitMq(rabbit =>
                {
                    IRabbitMqHost host = rabbit.Host(configuration.HostAddress, settings =>
                    {
                        settings.Username(configuration.Username);
                        settings.Password(configuration.Password);
                    });

                    rabbit.ReceiveEndpoint(host, queue, c =>
                    {
                        c.UseConcurrencyLimit(1);
                        c.UseRetry(retryConfig => retryConfig.Incremental(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5)));
                        c.Consumer(() => new InventoryConsumer(_connectionString));
                    });
                });

                bus.Start();
            }
        }
        catch (Exception ex)
        {
            Logger.LogError(ex.Message);
        }

        return bus;
    }

public Task Consume(ConsumeContext<IInventory> context)
    {
        var inventory = context.Message;

        _logger.LogDebug($"Order object: {JsonConvert.SerializeObject(inventory, Formatting.Indented)}");
        _logger.LogDebug("Stating timer for UserChangeEventConsumer:");

        var timer = new Stopwatch();
        timer.Start();

        var message = context.Message;

        SetMessageAsConsumed(message);

        timer.Stop();
        _logger.LogDebug($"UserChangeEventConsumer completed in {timer.Elapsed.TotalSeconds} seconds.");

        return Task.FromResult(message);
    }

【问题讨论】:

    标签: .net rabbitmq masstransit


    【解决方案1】:

    您是否有特定原因要退回邮件?据我所知,最好不要让 Consume 方法返回消息,因为那里不应该有人听。

    如果你想向发布者返回一些东西,你应该使用request/response pattern

    【讨论】:

    • 我的客户希望我将所有使用的消息转换为将在 Swagger 中返回的特定对象。当它们被消耗时,它们就消失了,没有其他方法可以得到它们。
    • 也许你可以使用中间件来拦截消息? masstransit-project.com/advanced/middleware我不完全确定您所说的“以 Swagger 回归”是什么意思。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多