【问题标题】:How to get response via Masstransit?如何通过 Masstransit 获得响应?
【发布时间】:2021-12-03 15:21:40
【问题描述】:

我想从消费者 (GetStatusConsumer) 获得响应 (GetStatusResponse)。

我的请求被放入 Rabbit 队列“getstatus”,但我的消费者没有上升并且发生超时。

Publish-method 和 Consumer 嵌套在一个项目中

在我看来 Startup.cs 有问题。你能帮帮我吗?

我在 Startup.cs 中有以下代码

...
services.AddSingleton(provider =>
            {
                var getStatusBusOptions = provider.GetRequiredService<IOptions<BusOptions>>().Value;
                var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
                {
                    var host = sbc.Host(new Uri(getStatusBusOptions.HostUri), h =>
                    {
                        h.Username(getStatusBusOptions.UserName);
                        h.Password(getStatusBusOptions.Password);
                    });
                    sbc.ReceiveEndpoint("getstatus", ep =>
                    {
                        ep.Consumer<GetStatusConsumer>(provider);
                        ep.PrefetchCount = getStatusBusOptions.PrefetchCount;
                        ep.UseConcurrencyLimit(getStatusBusOptions.UseConcurrencyLimit);
                    });

                });


            return new GetStatusBus
            { 
                Bus = bus,
                HostUri = getStatusBusOptions.HostUri
            };
        });
...

GetStatusPublisher.cs 类中的以下代码

public class GetStatusPublisher : IGetStatusPublisher
{
    readonly GetStatusBus _bus;

    public GetStatusPublisher(GetStatusBus bus)
    {
        _bus = bus;
    }

    public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
    {
        var serviceAddress = new Uri($"rabbitmq://rabbitmq.test.com/jgt/getstatus"); 
        var timeout = TimeSpan.FromSeconds(30); 

        var client = new MessageRequestClient<Tin, Tout>(_bus.Bus, serviceAddress, timeout);

        var resp = await client.Request(request); // <== Timeout here and don't rise consumer (GetStatusConsumer)

        return resp;
    }

这里是发布方法:

...
 readonly IGetStatusPublisher _getStatusPublisher;
...
     var resp = await _getStatusPublisher.GetResponse<GetStatusRequest, GetStatusResponse>(statusReq);

消费者有以下代码:

public class GetStatusConsumer : MetricWriter, IConsumer<GetStatusRequest>
{
        public GetStatusConsumer(IMetrics metrics) : base(metrics)
        {
......
        }

        public async Task Consume(ConsumeContext<GetStatusRequest> context)
        {
....
         }

}

【问题讨论】:

    标签: rabbitmq masstransit


    【解决方案1】:

    首先,我不认为你正在启动公共汽车。这是主要问题。

    但是……

    您使用的是老式版本的 MassTransit 吗?您的代码严重过时,我建议您更新它以使用当前版本/语法,因为您上面的代码示例有很多问题。

    services.AddMassTransit(x =>
    {
        x.AddConsumer<GetStatusConsumer>();
    
        x.UsingRabbitMq((context, cfg) => 
        {
            var getStatusBusOptions = context.GetRequiredService<IOptions<BusOptions>>().Value;
    
            cfg.Host(new Uri(getStatusBusOptions.HostUri), h =>
            {
                h.Username(getStatusBusOptions.UserName);
                h.Password(getStatusBusOptions.Password);
            });
    
            cfg.PrefetchCount = getStatusBusOptions.PrefetchCount;
            cfg.ConcurrentMessageLimit = getStatusBusOptions.UseConcurrencyLimit;
    
            cfg.ConfigureEndpoints(context);
        });
    });
    services.AddMassTransitHostedService();
    services.AddGenericRequestClient();
    

    然后,您可以简单地添加对IRequestClient&lt;T&gt; 的依赖来发送请求并获得响应。您更新后的发布商代码可能如下所示:

    public class GetStatusPublisher : 
        IGetStatusPublisher
    {
        public GetStatusPublisher(IServiceProvider provider)
        {
            _provider = provider;
        }
    
        public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
        {
            var client = _provider.GetRequiredService<IRequestClient<Tin>>();
    
            var response = await client.GetResponse<Tout>(request);
    
            return response.Message;
        }
    }
    

    【讨论】:

      【解决方案2】:

      如果我使用以下代码,我会收到错误:“System.InvalidOperationException: Cannot resolve scoped service 'MassTransit.IRequestClient`1[GetStatusTry.Contracts.GetStatusRequest]' from root provider..... "

         public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
              {
                  try
                  {
                      var client = _provider.GetRequiredService<IRequestClient<Tin>>(); // << --- here is error 
      
                      var response = await client.GetResponse<Tout>(request);
      
                      return response.Message;
                  }
                  catch (Exception ex)
                  {
                      Console.WriteLine(ex.Message);
                      throw;
                  }
      
              }
      

      但是这段代码可以正常工作:

      公共类 Dev2Controller : ControllerBase {

      IRequestClient<GetStatusRequest> _client;
      
      public Dev2Controller(IGetStatusPublisher getStatusPublisher, IRequestClient<GetStatusRequest> client)
      {
          _client = client;
      }
      
      [HttpPost]
      public async Task<GetStatusResponse> GetStatus2()
      {
          var req = new GetStatusRequest { Statuses = new List<int> { 1, 2, 3 }, TerminalDescr = "try to get status2" };
      
          var response = await _client.GetResponse<GetStatusResponse>(req);
      
          return (response.Message);
      }
      

      }

      【讨论】:

        【解决方案3】:

        此代码正在运行

        public class GetStatusPublisher : IGetStatusPublisher
        {
            readonly IServiceProvider _provider;
        
            public GetStatusPublisher(IServiceProvider provider)
            {
                _provider = provider;
            }
        
            public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
            {
                try
                {
                    using (var _scope = _provider.CreateScope())
                    {
                        var client = _scope.ServiceProvider.GetRequiredService<IRequestClient<Tin>>();
        
                        var response = await client.GetResponse<Tout>(request);
        
                        return response.Message;
                    }
        
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    throw;
                }
        
            }
        }
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2012-08-03
          • 1970-01-01
          • 2019-04-27
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2011-04-17
          相关资源
          最近更新 更多