【问题标题】:How can I stop MassTransit creating exchange bindings for error messages如何停止 MassTransit 为错误消息创建交换绑定
【发布时间】:2015-12-29 19:39:44
【问题描述】:

我正在尝试侦听错误队列以处理失败的消息,但我似乎无法让 MassTransit 不对我希望它在配置中侦听的消息设置绑定。配置如下,使用的是 MassTransit v3:

var hostAddress = new Uri("rabbitmq://localhost/");
var username = "guest";
var password = "guest";

_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
  var host = configurator.Host(hostAddress, h =>
  {
    h.Username(username);
    h.Password(password);
  });

  configurator.ReceiveEndpoint(host, "myqueue_error",
  endpointConfigurator =>
  {
    endpointConfigurator.Handler<SomeMessage>(context =>
    {
      return Console.Out.WriteLineAsync("Woop");
    });
  });
});

在上面的示例中,它将为发布SomeMessage 的任何内容设置绑定,并将它们定向到myqueue_error,我只希望消息进入此队列,该队列已从失败的服务转发。有没有办法使用队列中的消息,但告诉 MassTransit 不要为它们绑定?

更新 - 潜在的解决方案

似乎我不需要设置 ReceiveEndpoint,但我可以重命名控制总线以接受我关心的消息,然后这将能够处理这些消息,而无需为消息创建交换绑定。

以下是修改后的代码,不确定这是否是一种理想的方式,但它有效

var hostAddress = new Uri("rabbitmq://localhost/");
var username = "guest";
var password = "guest";

_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
{
  configurator.Host(hostAddress, h =>
  {
    h.Username(username);
    h.Password(password);
  });

  // We need to make the queue look like the error queue
  configurator.BusQueueName = $"{_queue}_error";
  configurator.Durable = true;
  configurator.AutoDelete = false;
  configurator.SetQueueArgument("x-expires", null);
});

var connectHandle = _busControl.ConnectHandler<SomeMessage>(context => Console.Out.WriteLineAsync("Woop"));

_busHandle = _busControl.Start();

_busHandle.Ready.Wait();

// Wait

// Clean up

connectHandle.Disconnect();
_busHandle.Stop

【问题讨论】:

    标签: c# .net rabbitmq masstransit message-bus


    【解决方案1】:

    通过大量挖掘,我找到了一个更好的解决方案,但我完全错过了文档。

    似乎我们可以通过订阅消费者收听错误来收听消息这非常适合我一直在努力实现的目标,我们还可以保持错误队列的正常运行。 http://docs.masstransit-project.com/en/mt3/usage/exceptions.html#handling-exceptions

    因此,我解决的最后一点配置如下:

        var hostAddress = new Uri("rabbitmq://localhost/");
        var username = "guest";
        var password = "guest";
    
        _busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(configurator =>
        {
          var host = configurator.Host(hostAddress, h =>
          {
            h.Username(username);
            h.Password(password);
          });
    
          configurator.ReceiveEndpoint(host, "error_listener",
          endpointConfigurator =>
          {
            endpointConfigurator.Handler<Fault<SomeMessage>>(context =>
            {
              return Console.Out.WriteLineAsync("Woop");
            });
          });
        });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-04-01
      • 1970-01-01
      • 2018-12-12
      • 2015-06-26
      • 2017-05-09
      相关资源
      最近更新 更多