【问题标题】:Catching events prior to subscription with FromEventPattern使用 FromEventPattern 在订阅之前捕获事件
【发布时间】:2018-04-02 22:48:53
【问题描述】:

我正在使用 Rx 框架编写消息侦听器。

我面临的问题是,我正在使用的库使用了一个消费者,该消费者在消息到达时发布事件。

我已经设法通过Observable.FromEventPattern 使用传入的消息,但我对服务器中已经存在的消息有疑问。

目前我有以下命令链

  1. 创建消费者
  2. 使用FromEventPattern 创建一个可观察序列并应用所需的转换
  3. 告诉消费者开始
  4. 订阅序列

最简单的解决方案是交换步骤 3 和 4。但由于它们发生在系统的不同组件中,我很难这样做。

理想情况下,我希望在第 4 步发生时执行第 3 步(例如 OnSubscribe 方法)。

感谢您的帮助:)

PS:添加更多细节,事件来自 RabbitMQ 队列,我使用的是 RabbitMQ.Client 包中的 EventingBasicConsumer 类。

Here 你可以找到我正在开发的库。具体来说,this 是给我带来问题的类/方法。

编辑

这是有问题的代码的剥离版本

void Main()
{
    var engine = new Engine();

    var messages = engine.Start();

    messages.Subscribe(m => m.Dump());

    Console.ReadLine();

    engine.Stop();
}

public class Engine
{
    IConnection _connection;
    IModel _channel;

    public IObservable<Message> Start()
    {
        var connectionFactory = new ConnectionFactory();

        _connection = connectionFactory.CreateConnection();
        _channel = _connection.CreateModel();

        EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);

        var observable = Observable.FromEventPattern<BasicDeliverEventArgs>(
                                        a => consumer.Received += a, 
                                        a => consumer.Received -= a)
                                    .Select(e => e.EventArgs);

        _channel.BasicConsume("a_queue", false, consumer);

        return observable.Select(Transform);
    }

    private Message Transform(BasicDeliverEventArgs args) => new Message();

    public void Stop()
    {
        _channel.Dispose();
        _connection.Dispose();
    }
}

public class Message { }

我遇到的症状是,由于我在订阅序列之前调用了 BasicConsume,所以 RabbitMQ 队列中的任何消息都会被获取但不会沿管道传递。

由于我没有启用“autoack”,因此程序一停止,消息就会返回到队列中。

【问题讨论】:

  • 您的意思是您因为订阅太晚而错过了一些消息?
  • 没错!我应该解决这个问题吗?
  • 这部分似乎很清楚,但正如您自己所说的那样 - 最合理的做法是在开始之前订阅。为什么你不能在控制所有代码的同时做到这一点对我来说不是很清楚。也许只是将可选订阅回调传递给Start?您也可以使用Replay(),但它会缓存所有消息,这可能不是一个好主意(取决于这些消息的数量)。
  • 我没有想到回调,但我想知道是否有解决问题的Rx方法。至于拆分,Host 和 Engine 职责不同,混在一起不好。
  • “重播”是不行的。我正在查看“发布”,但我不确定我是否了解它的工作原理以及应该在哪里使用它。

标签: c# .net rabbitmq system.reactive


【解决方案1】:

正如一些人在 cmets 中指出的那样,正如您在问题中指出的那样,问题是由于您使用 RabbitMQ 客户端的方式造成的。

为了解决其中的一些问题,我实际上是创建了一个 ObservableConsumer 类。这是当前正在使用的 EventingBasicConsumer 的替代方案。我这样做的一个原因是为了处理问题中描述的问题,但这样做的另一件事是允许您在单个连接/通道实例之外重用这个消费者对象。这样做的好处是,尽管存在瞬态连接/通道特性,您的下游反应式代码仍能保持连线。

using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using RabbitMQ.Client;

namespace com.rabbitmq.consumers
{
    public sealed class ObservableConsumer : IBasicConsumer
    {
        private readonly List<string> _consumerTags = new List<string>();
        private readonly object _consumerTagsLock = new object();
        private readonly Subject<Message> _subject = new Subject<Message>();

        public ushort PrefetchCount { get; set; }
        public IEnumerable<string> ConsumerTags { get { return new List<string>(_consumerTags); } }

        /// <summary>
        /// Registers this consumer on the given queue. 
        /// </summary>
        /// <returns>The consumer tag assigned.</returns>
        public string ConsumeFrom(IModel channel, string queueName)
        {
            Model = channel;
            return Model.BasicConsume(queueName, false, this);
        }

        /// <summary>
        /// Contains an observable of the incoming messages where messages are processed on a thread pool thread.
        /// </summary>
        public IObservable<Message> IncomingMessages
        {
            get { return _subject.ObserveOn(Scheduler.ThreadPool); }
        }

        ///<summary>Retrieve the IModel instance this consumer is
        ///registered with.</summary>
        public IModel Model { get; private set; }

        ///<summary>Returns true while the consumer is registered and
        ///expecting deliveries from the broker.</summary>
        public bool IsRunning
        {
            get { return _consumerTags.Count > 0; }
        }

        /// <summary>
        /// Run after a consumer is cancelled.
        /// </summary>
        /// <param name="consumerTag"></param>
        private void OnConsumerCanceled(string consumerTag)
        {

        }

        /// <summary>
        /// Run after a consumer is added.
        /// </summary>
        /// <param name="consumerTag"></param>
        private void OnConsumerAdded(string consumerTag)
        {

        }

        public void HandleBasicConsumeOk(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (!_consumerTags.Contains(consumerTag))
                    _consumerTags.Add(consumerTag);
            }
        }

        public void HandleBasicCancelOk(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (_consumerTags.Contains(consumerTag)) {
                    _consumerTags.Remove(consumerTag);
                    OnConsumerCanceled(consumerTag);
                }
            }
        }

        public void HandleBasicCancel(string consumerTag)
        {
            lock (_consumerTagsLock) {
                if (_consumerTags.Contains(consumerTag)) {
                    _consumerTags.Remove(consumerTag);
                    OnConsumerCanceled(consumerTag);
                }
            }
        }

        public void HandleModelShutdown(IModel model, ShutdownEventArgs reason)
        {
            //Don't need to do anything.
        }

        public void HandleBasicDeliver(string consumerTag,
                                       ulong deliveryTag,
                                       bool redelivered,
                                       string exchange,
                                       string routingKey,
                                       IBasicProperties properties,
                                       byte[] body)
        {
            //Hack - prevents the broker from sending too many messages.
            //if (PrefetchCount > 0 && _unackedMessages.Count > PrefetchCount) {
            //    Model.BasicReject(deliveryTag, true);
            //    return;
            //}

            var message = new Message(properties.HeaderFromBasicProperties()) { Content = body };
            var deliveryData = new MessageDeliveryData()
            {
                ConsumerTag = consumerTag,
                DeliveryTag = deliveryTag,
                Redelivered = redelivered,
            };

            message.Tag = deliveryData;

            if (AckMode != AcknowledgeMode.AckWhenReceived) {
                message.Acknowledged += messageAcknowledged;
                message.Failed += messageFailed;
            }

            _subject.OnNext(message);
        }

        void messageFailed(Message message, Exception ex, bool requeue)
        {
            try {
                message.Acknowledged -= messageAcknowledged;
                message.Failed -= messageFailed;

                if (message.Tag is MessageDeliveryData) {
                    Model.BasicNack((message.Tag as MessageDeliveryData).DeliveryTag, false, requeue);
                }
            }
            catch {}
        }

        void messageAcknowledged(Message message)
        {
            try {
                message.Acknowledged -= messageAcknowledged;
                message.Failed -= messageFailed;

                if (message.Tag is MessageDeliveryData) {
                    var ackMultiple = AckMode == AcknowledgeMode.AckAfterAny;
                    Model.BasicAck((message.Tag as MessageDeliveryData).DeliveryTag, ackMultiple);
                }
            }
            catch {}
        }
    }
}

【讨论】:

  • 谢谢!这个完全符合我的要求!我不得不将主题更改为 ReplaySubject。否则你的解决方案就像一个魅力。
  • ReplaySubject 具有您可能不喜欢的特定行为...
【解决方案2】:

我认为没有必要实际订阅兔子队列(通过BasicConsume),直到您订阅了您的 observable。现在,您将立即开始订阅 rabbit 并将项目推送到 observable,即使没有人订阅它。

假设我们有这个示例类:

class Events {
    public event Action<string> MessageArrived;

    Timer _timer;
    public void Start()
    {
        Console.WriteLine("Timer starting");
        int i = 0;
        _timer = new Timer(_ => {
            this.MessageArrived?.Invoke(i.ToString());
            i++;
        }, null, TimeSpan.Zero, TimeSpan.FromSeconds(1));
    }

    public void Stop() {
        _timer?.Dispose();
        Console.WriteLine("Timer stopped");
    }
}

你现在做的基本上是:

var ev = new Events();
var ob = Observable.FromEvent<string>(x => ev.MessageArrived += x, x => ev.MessageArrived -= x);               
ev.Start();    
return ob;

你需要的是 observable ,它确实做到了这一点,但只有当有人订阅时:

return Observable.Create<string>(observer =>
{
    var ev = new Events();
    var ob = Observable.FromEvent<string>(x => ev.MessageArrived += x, x => ev.MessageArrived -= x);
    // first subsribe
    var sub = ob.Subscribe(observer);
    // then start
    ev.Start();
    // when subscription is disposed - unsubscribe from rabbit
    return new CompositeDisposable(sub, Disposable.Create(() => ev.Stop()));
}); 

很好,但是现在每次订阅 observable 都会导致单独订阅兔子队列,这不是我们需要的。我们可以通过Publish().RefCount() 解决这个问题:

return Observable.Create<string>(observer => {
    var ev = new Events();
    var ob = Observable.FromEvent<string>(x => ev.MessageArrived += x, x => ev.MessageArrived -= x);
    var sub = ob.Subscribe(observer);                    
    ev.Start();                
    return new CompositeDisposable(sub, Disposable.Create(() => ev.Stop()));
}).Publish().RefCount(); 

现在会发生什么情况是当第一个订阅者订阅 observable(引用计数从 0 到 1)时 - 来自 Observable.Create 正文的代码被调用并订阅兔子队列。该订阅随后由所有后续订阅者共享。当最后一次取消订阅时(引用计数归零) - 订阅被释放,ev.Stop 被调用,我们从兔子队列中取消订阅。

如果发生这种情况,您调用 Start()(在您的代码中创建 observable)并且从不订阅它 - 没有任何反应,并且根本没有订阅 rabbit。

【讨论】:

  • 感谢@Evk 的回答。我今晚回家后会检查并通知您!
  • 这段代码很脆弱,因为 RabbitMQ 消费者是一个瞬态对象。当消费者断​​开连接时会发生什么?
  • 但这只是一个针对特定问题的示例(关于导致丢失消息的延迟订阅),而不是强大的 rabbitmq 侦听器实现。无论如何,如果消费者断开连接 - 最自然的事情似乎结束了可观察的流。
  • 这个答案提醒我,我需要努力将 RabbitMQ .NET 客户端的重新实现放到 github 上,以便其他人可以使用它......你可以帮助它变得更好。 :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-20
  • 2020-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-17
相关资源
最近更新 更多