【问题标题】:Mass Transit - Produce messages in Web App and consume in Azure FunctionMass Transit - 在 Web 应用程序中生成消息并在 Azure 函数中使用
【发布时间】:2019-08-14 14:54:15
【问题描述】:

我有一个通过 Azure 服务总线发送消息并等待回复的 Web 应用:

var response = await this.createClient.GetResponse<Models.Response<CommandResponse>>(message, cancellationToken);

我有一个使用消息的服务总线触发器的 Azure 函数:

        [FunctionName("CommandHandler")]
        public Task HandleCommandAsync(
            [ServiceBusTrigger("input-queue", Connection = "AzureWebJobsServiceBus"), ServiceBusAccount("ServiceBus")]
            Message message,
            IBinder binder,
            ILogger logger,
            CancellationToken cancellationToken)
        {
            logger.LogInformation("Command Handler Function Invoked.");

            var result = System.Text.Encoding.UTF8.GetString(message.Body);
            var d = JsonConvert.DeserializeObject<dynamic>(result);
            string messageType = d.message.messageType.Value;

            var handler = Bus.Factory.CreateBrokeredMessageReceiver(
                binder,
                cfg =>
                    {
                        cfg.CancellationToken = cancellationToken;
                        cfg.SetLog(logger);
                        cfg.InputAddress = new Uri($"{this.secrets.Value.ServiceBusUri}/input-queue");
                        cfg.UseRetry(x => x.Intervals(10, 100, 500, 1000));

                            cfg.Consumer(() => this.customerConsumer);
                    });

            var handlerResult = handler.Handle(message);
            logger.LogInformation("Command Handler Function Completed.");
            return handlerResult;
        }

为了从网络应用程序发送消息,我必须配置消费者。我不确定为什么这是必要的,因为 Web 应用永远不需要直接引用消费者,但是如果没有以下代码,就不会发送任何消息。

            services.AddMassTransit(
                x =>
                {
                    x.AddConsumer<CustomerConsumer>();

                    x.AddBus(
                            provider => Bus.Factory.CreateUsingAzureServiceBus(
                                cfg =>
                                    {
                                        var host = cfg.Host(secrets.AzureWebJobsServiceBus, h => { });

                                        cfg.ReceiveEndpoint(
                                            host,
                                            "input-queue",
                                            ep =>
                                                {
                                                    ep.ConfigureConsumer<CustomerConsumer>(provider);

                                                    ep.PrefetchCount = 16;
                                                    ep.UseMessageRetry(r => r.Interval(2, 100));
                                                });
                                    }));

                    x.AddRequestClient<RegisterNewCustomerCommand>();
                });

问题是消息发送后,有时CommandHandler函数app触发并调用消费者的Handle消息。

有时 Web App 会无意中直接调用消费者(因为消费者在启动时注册并监听队列中的消息)。

这对我来说是不希望的。对于可伸缩性,只有 Azure 函数应该调用消费者。此外,消费者注入了仅在函数应用的WebHostStartUp 中注册的依赖项,因此如果直接从 Web 应用调用消费者会出错。

问题:如何打破 Web App 和消费者之间的依赖关系,使他们永远不会被 Web App 直接调用,而总是被函数应用触发器调用?有什么方法可以避免在 Web App 的 StartUp 方法中添加/配置消费者,同时仍然允许 Web App 通过GetResponse 发送消息?

更新 - 解决方案

ASP.NET Core 2.2 Startup 不需要对使用者有任何引用(除非您与我不同,您希望您的 Web 应用也使用消息)。

您需要将包含队列路径的服务总线 URL 添加到您的AddRequestClient。查看已接受的答案。

这对我有用。


            services.AddMassTransit(
                x =>
                    {
                        x.AddBus(
                            provider => Bus.Factory.CreateUsingAzureServiceBus(
                                cfg =>
                                    {
                                        cfg.Host(
                                            secrets.AzureWebJobsServiceBus,
                                            h => { h.TransportType = TransportType.Amqp; });
                                    }));

                        var serviceBusUri = new Uri($"{settings.ServiceBusUri}/input-queue");
                        x.AddRequestClient<RegisterNewCustomerCommand>
                    });
        }

【问题讨论】:

    标签: azureservicebus masstransit


    【解决方案1】:

    您不需要在客户端 API 上有使用者。我唯一能想到的就是你的拓扑设置不正确。

    1. 请提前确保队列存在,因为函数不会创建队列。
    2. 配置队列的 URI 地址,以便您的 Web API 可以在 AddRequestClient 调用中将其指定为参数 (_sb://host..../input-queue)。
    3. 利润!

    由于您配置请求客户端的方式,它可能使用的是发布而不是发送,因为它不知道队列地址。在这种情况下,将使用者添加到您的 Web API 会正确订阅主题以将消息转发到队列。您可以使用 Service Bus Explorer 之类的工具来查看它是如何在云中布局的。

    将 URI 添加到请求客户端的客户端应用程序应该可以解决问题,并将命令直接发送到队列。

    【讨论】:

    • 谢谢 - 它工作。我更新了我的问题以显示 ASP.NET Core 2.2 Web API Startup 应该是什么样子。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多