【问题标题】:Reactive Extensions crash .net processReactive Extensions 崩溃 .net 进程
【发布时间】:2013-11-29 23:03:27
【问题描述】:

我开始在我的项目中使用响应式扩展并发现以下问题。我的班级中有一个主题,我需要所有客户端观察者在不同的线程中处理 OnNext 消息。

private Subject<IEnumerable<int>> _multiResponse =
    new Subject<IEnumerable<int>>();

public IObservable<IEnumerable<int>>> MultiResponse
{
     get
     {
         return _response.ObserveOn(TaskPoolScheduler.Default);
     }
}

如果客户端在 OnNext 事件中抛出异常,所有进程都会因为 Task 中未观察到的异常而崩溃。堆栈跟踪:

Scheduler.cs:第 149 行中的 ProcessResponses(IEnumerable 响应) 在 System.Reactive.AnonymousSafeObserver'1.OnNext(T 值) 在 System.Reactive.ScheduledObserver'1.Dispatch(ICancelable 取消) 在 System.Reactive.Concurrency.Scheduler.b_32(Action'1 a, ICancelable c) 在 System.Reactive.Concurrency.TaskPoolScheduler.c_DisplayClass7'1.b__6() 在 System.Threading.Tasks.Task.Execute()

这是 RX 中的预期行为还是错误以及如何处理?

【问题讨论】:

  • 异常的类型和信息是什么?这些通常比堆栈跟踪更重要。

标签: c# .net system.reactive


【解决方案1】:

好的,现在注意...这是关于未观察到的异常。

我认为您以异步方式做事这一事实并不重要。正如我所料,这个同步代码会抛出一个Exception

Observable.Return(1).Subscribe(
    _ => { throw new Exception(); });

然而,我希望找到下面的代码抛出一个Exception(从而解释你所看到的)但起初我很惊讶地发现它没有! p>

Observable.Return(1).ObserveOn(TaskPoolScheduler.Default).Subscribe(
    _ => { throw new Exception(); });

(编辑 - 见下文 - 这是 .NET 4.5 的行为)

当然切换到ThreadPoolScheduler 会得到预期的Exception

Observable.Return(1).ObserveOn(ThreadPoolScheduler.Instance).Subscribe(
    _ => { throw new Exception(); });

编辑:

是的 - 所以我不再偷懒,开始阅读源代码。事实证明,行为取决于您是否使用 .NET 4.5。

在 .NET 4.5 上,任务池的默认行为是不让未观察到的异常导致进程中断。在 .NET 4.0 上可以。因此,Rx 遵循平台默认设置并按照 .NET 4.0 上的其他调度程序运行,但在 .NET 4.5 上默默吞下异常。

因此,假设您使用的是 .NET 4.0,您所看到的是预期的 - 您有一个未观察到的异常。

还请注意,由于 .NET 4.5 是就地升级,这是一个重大更改。 重要的是计算机上安装了什么框架, 不是在项目属性中选择了什么框架。

相关的源代码在这里 - 值得一读,cmets 很有启发性:

http://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency/TaskPoolScheduler.cs

以下是 MSDN 上有关此行为更改的一些 cmets(请参阅备注),其中还显示了如何将 .NET 4.0 行为强加于 .NET 4.5 安装:

http://msdn.microsoft.com/en-us/library/jj160346(v=vs.110).aspx

我的建议是不要依赖这种行为,而是使用异常处理程序保护您的 OnNext 处理程序。需要明确的是,我的意思是订阅者不应该扔。 Rx 设计指南明确指出不应保护对 OnNext、OnError 和 OnCompleted 的调用。 (见第 6.4 节http://go.microsoft.com/fwlink/?LinkID=205219

【讨论】:

  • 我遇到了奇怪的行为,代码没有运行,突然崩溃,没有抛出异常。但是当我使用ObserveOn(TaskPoolScheduler.Default)时,它消失了,一切正常。我现在应该到处使用它吗?
  • @lolelo 这个“解决”您的问题的事实令人不安。我怀疑根本问题与 Rx 本身有关,我建议您进一步调查。
  • 在 xamarin 中,原来我需要在主线程上调用异步,同步会在旧设备上崩溃。它可能没有添加任何东西来做响应式扩展
  • @Iolelo 干得好,很高兴你找到了根本原因!
【解决方案2】:

詹姆斯世界感谢您的帮助。我正在使用 .Net 4.0,目前无法迁移到 4.5。 在我的任务中,我需要那个

public IObservable<IEnumerable<int>>> MultiResponse

将始终可供订阅者使用。为此,我创建了以下扩展方法:

public static class ObservableExtensions
{
    public static IObservable<T> ObserveSafe<T>(this IObservable<T> observable, Action<Exception> onError)
    {
        var subject = new Subject<T>();

        observable.Subscribe(x =>
                             {
                                 try
                                 {
                                     subject.OnNext(x);
                                 }
                                 catch (Exception exception)
                                 {
                                     onError(exception);
                                 }
                             },
                             exception =>
                             {
                                 try
                                 {
                                     subject.OnError(exception);
                                 }
                                 catch (Exception ex)
                                 {
                                     onError(ex);
                                 }
                                 finally
                                 {
                                     subject.Dispose();
                                 }
                             },
                             () =>
                             {
                                 try
                                 {
                                     subject.OnCompleted();
                                 }
                                 catch (Exception ex)
                                 {
                                     onError(ex);
                                 }
                                 finally
                                 {
                                     subject.Dispose();
                                 }
                             });

        return subject;
    }
}

【讨论】:

  • 我不是这种方法的忠实拥护者。请注意,您在此处立即订阅 - 甚至在单个下游订阅者之前。我能想到这样做的唯一原因是您不拥有观察者代码。作为一个小改进,考虑重构以通过将方法体移动到 Observable.Create 中来移动方法体(仅在订阅时调用)。如果可以的话,最好在观察者中处理异常。一般来说,无论如何你都不应该在观察者中做太多事情。你不想成为一个缓慢的消费者。
  • 一般来说,订阅 Observable 的调用和订阅其上游 Observable 的调用之间应该存在一一对应的关系,除非您正在执行显式的 Publish。另请参阅设计指南“订阅实现不应抛出”中的第 6.4 和 6.5 节:go.microsoft.com/fwlink/?LinkID=205219 该部分可能对您有用。请注意它如何特别建议不要保护对 OnNext、OnError 和 OnCompleted 的调用。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-10-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多