【问题标题】:The difference between Rx Throttle(...).ObserveOn(scheduler) and Throttle(..., scheduler)Rx Throttle(...).ObserveOn(scheduler) 和 Throttle(..., scheduler) 的区别
【发布时间】:2015-05-03 04:13:31
【问题描述】:

我有以下代码:

IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50), RxApp.MainThreadScheduler)
                                       .Subscribe(_ => UpdateUi());

正如预期的那样,UpdateUi() 将始终在主线程上执行。当我将代码更改为

IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50))
                                       .ObserveOn(RxApp.MainThreadScheduler)
                                       .Subscribe(_ => UpdateUi());

UpdateUI() 将在后台线程中执行。

为什么Throttle(...).ObserveOn(scheduler) 不等于Throttle(..., scheduler)

【问题讨论】:

    标签: c# system.reactive reactiveui


    【解决方案1】:

    在你给UpdateUi 的代码中的两个示例中,总是在RxApp.MainThreadScheduler 指定的调度程序上调用。我可以肯定地说这一点,因为ObserveOn 是一个装饰器,可确保在指定的调度程序上调用订阅者的OnNext 处理程序。请参阅here 进行深入分析。

    这么说,这有点令人费解。 RxApp.MainThreadScheduler 不是指正确的调度程序调度程序,或者UpdateUi 正在从调度程序线程转换。前者并非史无前例 - 请参阅 https://github.com/reactiveui/ReactiveUI/issues/768 其他人遇到过的情况。我不知道在那种情况下问题是什么。也许@PaulBetts 可以参与进来,或者您可以在https://github.com/reactiveui/ 提出问题。无论如何,我会在这里仔细检查您的假设,因为我希望这是一个经过良好测试的领域。你有完整的复制品吗?

    至于你的具体问题,Throttle(...).ObserveOn(scheduler)Throttle(..., scheduler)的区别如下:

    在第一种情况下,当 Throttle 指定时没有调度程序,它将使用默认平台调度程序来引入运行它的计时器所需的并发性 - 在 WPF 上,这将使用线程池线程。所以所有的限制都将在后台线程上完成,并且由于以下ObserveOn,释放的事件只会传递给指定调度程序上的订阅者。

    Throttle 指定调度程序的情况下,在该调度程序上进行限制 - 抑制事件和释放事件都将在该调度程序上进行管理,并且订阅者也将在同一调度程序上调用。

    因此,无论哪种方式,UpdateUi 都会在 RxApp.MainThreadScheduler 上被调用。

    在大多数情况下,最好不要在调度程序上限制 ui 事件,因为如果只有一小部分事件会通过限制,在后台线程上运行单独的计时器并支付上下文切换的成本通常会更高。

    所以,为了检查您没有遇到RxApp.MainThreadScheduler 的问题,我会尝试通过另一种方式明确指定调度程序或SynchronizationContext。如何做到这一点取决于您所在的平台 - ObserveOnDispatcher() 希望可用,或使用合适的 ObserveOn 重载。如果导入了正确的 Rx 库,则可以选择控件、同步上下文和调度程序。

    【讨论】:

    【解决方案2】:

    经过一番调查,我认为这是由于运行时使用的 Rx 版本与我预期的不同(我为第三方应用程序开发了一个插件)。

    我不知道为什么,但似乎默认的RxApp.MainThreadScheduler 无法正确初始化。默认实例是WaitForDispatcherScheduler (source)。这个类的所有函数都依赖attemptToCreateScheduler:

        IScheduler attemptToCreateScheduler()
        {
            if (_innerScheduler != null) return _innerScheduler;
            try {
                _innerScheduler = _schedulerFactory();
                return _innerScheduler;
            } catch (Exception) {
                // NB: Dispatcher's not ready yet. Keep using CurrentThread
                return CurrentThreadScheduler.Instance;
            }
        }
    

    在我的情况下似乎发生的是 _schedulerFactory() 抛出,导致 CurrentThreadScheduler.Instance 被返回。

    通过手动将RxApp.MainThreadScheduler 初始化为new SynchronizationContextScheduler(SynchronizationContext.Current) 行为符合预期。

    【讨论】:

      【解决方案3】:

      我刚刚遇到一个问题,首先导致我提出这个问题,然后进行了一些实验。

      事实证明,Throttle(timeSpan, scheduler) 足够聪明地“取消”已经安排好的去抖事件X,以防源发出另一个事件Y 之前 X 被观察到。因此,最终只会观察到Y(假设它是最后一个去抖事件)。

      使用Throttle(timeSpan).ObserveOn(scheduler)XY 都将被观察到。

      因此,从概念上讲,这是两种方法之间的重要区别。遗憾的是,Rx.NET 文档很少,但我相信这种行为是设计使然,对我来说很有意义。

      用一个例子来说明这一点 (fiddle):

      #nullable enable
      using System;
      using System.Threading;
      using System.Threading.Tasks;
      using System.Diagnostics;
      using System.Reactive.Concurrency;
      using System.Reactive.Linq;
      using System.Reactive.Subjects;
      using static System.Console;
      
      public class Program
      {
          static async Task ThrottleWithScheduler()
          {
              WriteLine($"\n{nameof(ThrottleWithScheduler)}\n");
      
              var sc = new CustomSyncContext();
              var scheduler = new SynchronizationContextScheduler(sc);
              var subj = new BehaviorSubject<string>("A");
      
              subj
                  .Do(v => WriteLine($"Emitted {v} on {sc.Elapsed}ms"))
                  .Throttle(TimeSpan.FromMilliseconds(500), scheduler)
                  .Subscribe(v => WriteLine($"Observed {v} on {sc.Elapsed}ms"));
      
              await Task.Delay(100);
              subj.OnNext("B");
              await Task.Delay(200);
              subj.OnNext("X");
              await Task.Delay(550);
              subj.OnNext("Y");
      
              await Task.Delay(2000);
              WriteLine("Finished!");
          }
      
          static async Task ThrottleWithObserveOn()
          {
              WriteLine($"\n{nameof(ThrottleWithObserveOn)}\n");
      
              var sc = new CustomSyncContext();
              var scheduler = new SynchronizationContextScheduler(sc);
              var subj = new BehaviorSubject<string>("A");
      
              subj
                  .Do(v => WriteLine($"Emitted {v} on {sc.Elapsed}ms"))
                  .Throttle(TimeSpan.FromMilliseconds(500))
                  .ObserveOn(scheduler)
                  .Subscribe(v => WriteLine($"Observed {v} on {sc.Elapsed}ms"));
      
              await Task.Delay(100);
              subj.OnNext("B");
              await Task.Delay(200);
              subj.OnNext("X");
              await Task.Delay(550);
              subj.OnNext("Y");
      
              await Task.Delay(2000);
              WriteLine("Finished!");
          }
      
          public static async Task Main()
          {
              await ThrottleWithScheduler();
              await ThrottleWithObserveOn();
          }
      }
      
      class CustomSyncContext : SynchronizationContext
      {
          private readonly Stopwatch _sw = Stopwatch.StartNew();
          public long Elapsed { get { lock (_sw) { return _sw.ElapsedMilliseconds; } } }
          public override void Post(SendOrPostCallback d, object? state)
          {
              WriteLine($"Scheduled on {Elapsed}ms");
              Task.Delay(100).ContinueWith(
                  continuationAction: _ =>
                  {
                      WriteLine($"Executed on {Elapsed}ms");
                      d(state);
                  },
                  continuationOptions: TaskContinuationOptions.ExecuteSynchronously);
          }
      }
      

      输出:

      ThrottleWithScheduler
      
      Emitted A on 18ms
      Emitted B on 142ms
      Emitted X on 351ms
      Scheduled on 861ms
      Emitted Y on 907ms
      Executed on 972ms
      Scheduled on 1421ms
      Executed on 1536ms
      Observed Y on 1539ms
      Finished!
      
      ThrottleWithObserveOn
      
      Emitted A on 4ms
      Emitted B on 113ms
      Emitted X on 315ms
      Scheduled on 837ms
      Emitted Y on 886ms
      Executed on 951ms
      Observed X on 953ms
      Scheduled on 1391ms
      Executed on 1508ms
      Observed Y on 1508ms
      Finished!
      

      【讨论】:

        猜你喜欢
        • 2018-06-11
        • 2016-12-20
        • 2011-07-25
        • 1970-01-01
        • 2015-01-02
        • 2016-12-07
        • 2014-06-07
        • 2016-06-10
        • 1970-01-01
        相关资源
        最近更新 更多