【问题标题】:Exception Handling in RX FromEvent<T> methodRX FromEvent<T> 方法中的异常处理
【发布时间】:2014-11-17 09:32:00
【问题描述】:

如何处理从消息处理程序退订时引发的异常

var rawSource = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
            handler => ((sender, e) => handler(e)),
            a => this._topicSubscribers.ForEach( s => s.MessageHandler += a ),
            a => this._topicSubscribers.ForEach( s => s.MessageHandler -= a));
        return rawSource;

在这段代码中,有时我会从 MessageHandler 抛出异常为“Illegalstateexception : {“Consumer is closed”}”

【问题讨论】:

    标签: c# exception-handling event-handling system.reactive reactive-programming


    【解决方案1】:

    事件通常不会抛出,因此可能是源头的错误行为。如果你能从源头上修复它,那就这样做吧。

    否则,您将不得不捕获并吞下错误:

    a => this._topicSubscribers.ForEach(s => 
    {
      try
      {
        s.MessageHandler += a;
      }
      catch
      {
      }
    })
    

    这可能并不理想,或者只是不使用FromEvent 方法:

    return Observable.Create<EventPattern<EMSMessageEventArgs>>(observer =>
    {
      EMSMessageHandler handler = (sender, e) => 
        observer.OnNext(new EventPattern<EMSMessageEventArgs>(sender, e)));
    
      try
      {
        _topicSubscribers.ForEach(s => s.MessageHandler += handler);
      }
      catch (Exception ex)
      {
        try
        {
          _topicSubscribers.ForEach(s => s.MessageHandler -= handler);
        }
        catch { }
    
        observer.OnError(ex);
      }
    
      return Disposable.Create(() =>
      {
        try
        {
          _topicSubscribers.ForEach(s => s.MessageHandler -= handler);
        }
        catch { }
      });
    });
    

    请注意,Rx 需要序列化通知(Rx Design Guidelines 中的第 4.2 节),因此您必须确保所有_topicSubscribers 按顺序引发事件,而不是同时引发。如果不能,那么您必须自己同步所有对observer.OnNext 的调用,可能通过获取锁来实现。

    更新:需要明确的是,无论您使用FromEvent 还是Create,都需要序列化,因此即使您选择像我的第一个示例那样简单地吞下异常,您仍然会需要确保源永远不会同时引发事件;如果你不能,那么无论如何你都不得不使用我的Create 示例和锁。 FromEvent 不会为你这样做。

    【讨论】:

      【解决方案2】:

      像这样使用FromEvent 是在自找麻烦,因为 Dave 提到了 Rx 中需要序列化的所有原因。

      但是,假设事件不是在每个事件源中同时引发的(我相信 EMS MessageConsumer 就是这种情况),我会在 FromEvent 之后而不是在其中进行聚合,然后让 Rx 来做举重:

      var sources = new List<IObservable<EMSMessageEventArgs>();     
      
      foreach(var topicSubscriber in this._topicSubscribers.ToList())
      {
          var source = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
              handler => ((sender, e) => handler(e)),
              h => topicSubscriber.MessageHandler += h,
              h => topicSubscriber.MessageHandler -= h)
              .Synchronize();
      }
      
      rawSource = sources.Merge();
      

      这样Merge 将负责正确聚合和序列化各个源 - 但是,可能在各个事件中仍然存在并发。我实际上不认为FromEvent 会因在单个来源中同时引发的事件而受到压力。但是,Merge 可能不那么宽容,在这种情况下,使用上面的Sychronize() 可以确保在单个事件源级别以及跨事件源进行序列化。

      【讨论】:

      • 即使在单个事件注册中,调用者也有责任连续引发事件。 FromEvent 本身不会序列化通知。
      • 不,它没有,但它也没有坏……至少我把它弄坏了,它没有弯曲(很多操作员实际上可以容忍这一点)。因此,我怀疑使用Synchronize 确实可以解决我认为的问题。
      • @DaveSexton Btw - 你知道不需要连续引发 .NET 事件(在 Rx 之外)吗?我在这里广泛研究了这一点:*.com/questions/24572366/…
      • 坦率地说,如果+=-= 确实抛出异常,那么该死的世界应该会在你耳边翻滚!我不想处理那个错误! :)
      • 具体来说,我正在考虑竞争条件不可避免的网络场景。有时您不希望应用程序因为ObjectDisposedException 而崩溃。相反,您只需将 UI 状态转换为“断开连接”或“无法连接”。