【问题标题】:Message handler not activating消息处理程序未激活
【发布时间】:2016-05-11 13:21:13
【问题描述】:

我遇到了一些关于 rebus 的问题。

这是我的场景。 我们提供三项服务

Identity) 发布了“IdentityCreated”消息

网关)将“UpdateProfileCommand”直接发送到“profile-westeu-input”队列中

Profile) 使用来自输入队列“profile-westeu-input”的消息并订阅“IdentityCreated”消息

Profile Service

中看到的rebus配置

鉴于我已经在温莎城堡注册了我的处理程序。

container.Register(Classes.FromThisAssembly()
                  .BasedOn(typeof(IHandleMessages<>))
                  .WithServiceAllInterfaces()
                  .LifestyleTransient());

我用 Rebus 配置了

var bus = Configure.With(new CastleWindsorContainerAdapter(container))
            .Logging(x => x.Trace())
            .Transport(
                t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
                        inputQueueAddress: ProfileInputQueueName, mode: AzureServiceBusMode.Standard))
            .Options(o => o.SimpleRetryStrategy(ProfileErrorQueueName))
            .Start();

并订阅了这样的消息类型

bus.Subscribe(typeof(Nabufit.Messages.Identity.Events.IdentityCreated)).Wait()

我预计我的处理程序会被自动调用。但是它没有:(。

我尝试了不同的解决方案

  • 更改了输入队列的名称
  • 创建了一个事件发射器程序,该程序发布了一个“IdentityCreated”类型的事件。在输入队列中查看时,它存在,但不会被 rebus 拾取。

奖金信息:

  • 使用 azure 服务总线
  • 在 Service Fabric 应用程序中托管 Rebus
  • 我的输入队列名为“profile-westeu-input”

【问题讨论】:

  • 会发生什么?什么都没有发生吗?还是你得到一个异常说消息无法发送给任何处理程序?
  • 什么都没有。我从 rebus 获得的最新信息是它已经启动了 1 名工人,然后从那里开始沉默。我可以看到当前版本已经有一段时间没有更新 WindowsAzure.ServiceBus 包了。我已经在本地更新了包,但似乎没有解决问题。但是,您会尝试更新到最新版本并运行测试吗?
  • 抬头。我们正在运行 .net 4.6.1,它与 rebus.azureservicebus 中建议的 WindowsAzure.ServiceBus 包版本存在一些问题。 stackoverflow.com/questions/34329056/…
  • 您能否启动 Azure 服务总线资源管理器并查看主题?
  • 我目前不在办公室,但如果我没记错的话,它在消息类型下列出,订阅者路由到 profile_westeu_input?

标签: azureservicebus azure-service-fabric rebus


【解决方案1】:

在调查该应用程序后,我们发现我们在 OwinCommunicationListener 中的 Webapi 之间共享了 Windsor Container,它具有一些自定义的生命周期依赖项配置。这导致了两个不同的错误。

  1. 由于容器配置,拒绝接收事件
  2. 从架构上讲,它不智能地与消费进程共享同一个容器

我们最终使用 rebus 提供的 BuiltinHandlerActivation 类构建了一个自定义的 ICommunicationListener 特定于总线消费过程。看起来像这样。

 public class ServiceBusCommunicationListener : ICommunicationListener
{
    private BuiltinHandlerActivator activator;

    public async Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        activator = new BuiltinHandlerActivator();
        RegisterHandlers(activator);

        var connectionString = "...";
        var bus = Configure.With(activator)
            .Logging(x => x.Serilog(Log.Logger))
            .Transport(
                t => t.UseAzureServiceBus(connectionStringNameOrConnectionString: connectionString,
                        inputQueueAddress: "input", mode: AzureServiceBusMode.Standard))
            .Options(o => o.SimpleRetryStrategy("error"))
            .Start();

        return connectionString;
    }

    private void RegisterHandlers(BuiltinHandlerActivator builtinHandlerActivator)
    {
        (...)
    }

    public async Task CloseAsync(CancellationToken cancellationToken)
    {
        if (activator != null)
            activator.Dispose();
    }

    public void Abort()
    {
        if (activator != null)
            activator.Dispose();
    }
}

并将 ServicebusCommunicationListner 注册为 ServiceInstanceListener。

internal sealed class ProfileService : StatelessService
{
    public ProfileService(StatelessServiceContext context)
        : base(context)
    { }

    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        return new[]
        {
            new ServiceInstanceListener(context => new ServiceBusCommunicationListener()), 
        };
    }
}

【讨论】:

  • 这在 Rebus 托管在 Stateless 服务中时会起作用,但是当 Rebus 托管在 Statefull 服务中时,最好在 Statefull 服务的 Run 方法中配置和启动 Rebus。原因是在 Statefull 服务生命周期中,通信侦听器是在服务的 Open 和 Run 方法执行之间创建的。在执行 Run 方法之前,Service Fabric 不保证 StateManager 处于有效状态,因此在执行 Run 方法之前,任何触发并期望在 StateManager 上工作的处理程序都可能无法工作。
猜你喜欢
  • 1970-01-01
  • 2014-02-26
  • 2020-03-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-12-23
  • 2010-11-26
相关资源
最近更新 更多