【发布时间】:2020-05-14 18:00:31
【问题描述】:
我是 Masstransit 的新手,我不确定如何配置它。 有没有办法将消费的消息返回到我调用 Consume 方法的地方? Consume 方法返回 IBusControl 但我无法获取消息。
实际的 Consume 方法是通过 ReceiveEndpoint 以某种方式调用的,并且此 Consume 方法返回一个保存消息的 Task。所以我的问题是,如何将此消息返回到 ConsumeMessage 方法,以便我可以使用结果填充 InventoryItemObject。
public IInventory ConsumeMessage(string domainId)
{
using (consumer = new MassTransitConsumer(MassTransitConsumer.GetConfiguration()))
{
try
{
var message = consumer.Consume(domainId).StartAsync().Result;
}
catch (Exception ex)
{
throw new Exception(ex.Message);
}
}
return new InventoryItemObject();
}
public IBusControl Consume(string queue)
{
var endpoint = new Uri(string.Concat(configuration.HostAddress, "/", queue));
try
{
if (configuration.QueueType == "RabbitMQ" || string.IsNullOrEmpty(configuration.QueueType))
{
bus = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
IRabbitMqHost host = rabbit.Host(configuration.HostAddress, settings =>
{
settings.Username(configuration.Username);
settings.Password(configuration.Password);
});
rabbit.ReceiveEndpoint(host, queue, c =>
{
c.UseConcurrencyLimit(1);
c.UseRetry(retryConfig => retryConfig.Incremental(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5)));
c.Consumer(() => new InventoryConsumer(_connectionString));
});
});
bus.Start();
}
}
catch (Exception ex)
{
Logger.LogError(ex.Message);
}
return bus;
}
public Task Consume(ConsumeContext<IInventory> context)
{
var inventory = context.Message;
_logger.LogDebug($"Order object: {JsonConvert.SerializeObject(inventory, Formatting.Indented)}");
_logger.LogDebug("Stating timer for UserChangeEventConsumer:");
var timer = new Stopwatch();
timer.Start();
var message = context.Message;
SetMessageAsConsumed(message);
timer.Stop();
_logger.LogDebug($"UserChangeEventConsumer completed in {timer.Elapsed.TotalSeconds} seconds.");
return Task.FromResult(message);
}
【问题讨论】:
标签: .net rabbitmq masstransit