【发布时间】:2021-12-03 15:21:40
【问题描述】:
我想从消费者 (GetStatusConsumer) 获得响应 (GetStatusResponse)。
我的请求被放入 Rabbit 队列“getstatus”,但我的消费者没有上升并且发生超时。
Publish-method 和 Consumer 嵌套在一个项目中
在我看来 Startup.cs 有问题。你能帮帮我吗?
我在 Startup.cs 中有以下代码
...
services.AddSingleton(provider =>
{
var getStatusBusOptions = provider.GetRequiredService<IOptions<BusOptions>>().Value;
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
var host = sbc.Host(new Uri(getStatusBusOptions.HostUri), h =>
{
h.Username(getStatusBusOptions.UserName);
h.Password(getStatusBusOptions.Password);
});
sbc.ReceiveEndpoint("getstatus", ep =>
{
ep.Consumer<GetStatusConsumer>(provider);
ep.PrefetchCount = getStatusBusOptions.PrefetchCount;
ep.UseConcurrencyLimit(getStatusBusOptions.UseConcurrencyLimit);
});
});
return new GetStatusBus
{
Bus = bus,
HostUri = getStatusBusOptions.HostUri
};
});
...
GetStatusPublisher.cs 类中的以下代码
public class GetStatusPublisher : IGetStatusPublisher
{
readonly GetStatusBus _bus;
public GetStatusPublisher(GetStatusBus bus)
{
_bus = bus;
}
public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
{
var serviceAddress = new Uri($"rabbitmq://rabbitmq.test.com/jgt/getstatus");
var timeout = TimeSpan.FromSeconds(30);
var client = new MessageRequestClient<Tin, Tout>(_bus.Bus, serviceAddress, timeout);
var resp = await client.Request(request); // <== Timeout here and don't rise consumer (GetStatusConsumer)
return resp;
}
这里是发布方法:
...
readonly IGetStatusPublisher _getStatusPublisher;
...
var resp = await _getStatusPublisher.GetResponse<GetStatusRequest, GetStatusResponse>(statusReq);
消费者有以下代码:
public class GetStatusConsumer : MetricWriter, IConsumer<GetStatusRequest>
{
public GetStatusConsumer(IMetrics metrics) : base(metrics)
{
......
}
public async Task Consume(ConsumeContext<GetStatusRequest> context)
{
....
}
}
【问题讨论】:
标签: rabbitmq masstransit