【问题标题】:Turning a EasynetQ subscriber into an observable将 EasynetQ 订阅者变成 observable
【发布时间】:2018-03-14 17:53:18
【问题描述】:

我在使用 RabbitMQ 时使用 EasyNetQ 作为客户端库。要创建一个订阅者,请喜欢

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

当发布 MyMessage 的实例时,EasyNetQ 将调用委托并将消息的 Text 属性打印到控制台。

我如何把它变成一个可观察的序列?我一直在研究 Observable.CreateObservable.Generate 方法,但我不知道如何连接 RabbitMQ 消费者和可观察序列。

Subscribe 方法返回一个IDisposable,因此解决方案应该尊重这一点,以便可以正确处置资源。

我注意到了这个solution,但大多数人似乎建议不要使用Subject,所以我想找到另一个解决方案。

欢迎任何提示或想法。

【问题讨论】:

    标签: c# rabbitmq reactive-programming system.reactive easynetq


    【解决方案1】:

    这行得通吗?

    var observable = Observable.Create<MyMessage>(o => 
        bus.Subscribe<MyMessage>("my_subscription_id", msg => o.OnNext(msg))
    );
    

    我对引用代码中使用Subject 的方式没有意见:如果Subject 是一个私有(理想情况下为只读)字段,仅作为IObservable 公开,那么它的影响是包含在内的.但是,该引用链接处的代码还有其他问题:

    • 如果有多个.Connect() 调用,那么就会有多个订阅,只有一个会被释放。这可能会导致泄漏(不知道 EasyNetQ 订阅是如何实现的)。

    • 最好不要实现IObservable(或IConnectableObservable),因为很容易犯上述错误。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-12-03
      • 2014-10-23
      • 1970-01-01
      • 2020-08-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多