【问题标题】:Throttling IObservable causes observer not being invoked限制 IObservable 导致未调用观察者
【发布时间】:2014-02-26 23:43:13
【问题描述】:

我有这个简单的代码,其中:

  • 创建 IObservable
  • 采样半秒
  • 使用 ThreadPool 调度程序订阅它
  • 使用 SynchronizationContext 观察它

代码如下:

private void DisplayPoints()
{
    var x = 0;
    var ob = this.GeneratePoints();
    ob
      .Sample(TimeSpan.FromMilliseconds(500))
      .SubscribeOn(ThreadPoolScheduler.Instance)
      .ObserveOn(SynchronizationContext.Current)
      .Subscribe(d => Console.WriteLine(d));
}

private IObservable<double> GeneratePoints()
{
    return Observable.Create<double>(o => this.GeneratePoints(o));
}

private IDisposable GeneratePoints(IObserver<double> observer)
{
    var i = 0;
    while (true)
    {
        var value = random.Next(0, 100) * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1))));

        observer.OnNext(value);

        i++;
    }

    return Disposable.Empty;
}

但是,控制台上不会输出任何内容(即永远不会调用匿名观察者)。如果我删除 Sample 运算符,则会调用观察者,尽管这种行为显然不是预期的(UI 线程将被轰炸)。

我显然在这里遗漏了一些东西。我的目的是生成数据,通过 IObserver 推送数据,并通过 UI 显示其中的一些数据。

编辑:由于有些人误解了我的意图(尽管上面已经明确说明),我应该重申一下,我正在尝试做的事情:

  • 使用算法生成一些数据(double 值似乎足以解决我的问题)
  • 在 GUI 中显示数据

使用IObservable 和响应式扩展似乎是解决我的问题的好方法。

再重复一遍:不会在实际代码中返回随机数 - 这只是一个占位符 让我的预期行为发挥作用。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    您可能不想在紧密循环中生成随机数。最好使用时间间隔。下面每 200 毫秒生成一次随机数。

    IObservable<double> observable =
         Observable.Interval(TimeSpan.FromMillSeconds(200))
              .Select((t,i) => random.Next(0, 100) 
                          * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1)))))
    

    Enigmativity 为您编写的代码实际上也是紧密循环的。他关于您在订阅过程中推出价值的错误的观点也是正确的。您必须对代码进行的最小更改才能使其正常工作。

        private static Task GeneratePoints(IObserver<double> observer, CancellationToken token)
        {
            return Task.Run(() =>
            {
                var i = 0;
                var random = new Random();
                while ( true )
                {
                    token.ThrowIfCancellationRequested();
    
                    var value = random.Next(0, 100) * ( 1 / ( double ) random.Next(1, Math.Min(50, Math.Max(i, 1))) );
    
                    observer.OnNext(value);
    
                    i++;
                }
            });
        }
    

    稍后

        Observable.Create<double>((observer, token) => GeneratePoints(observer, token));
    

    注意正在传递的取消令牌。当序列的订阅者取消订阅时,将设置此令牌并终止循环。

    然而,这是很多工作,Enigmativities 的答案更简单,并为您抽象出上述代码。对于更复杂的情况,了解如何手动执行此操作仍然很有用。

    【讨论】:

    • 为什么要在值之间引入延迟?
    • OP 询问了关于节流的问题,这在没有时间间隔的事件范围内不太有意义。我想他可能会尝试在两者之间延迟发送事件。如果您确实想要转储事件,您的代码是完全正确的。在我自己回答之前,我投票给你;)
    • 看起来 OP 确实引入了延迟,但是向可观察源添加延迟可能会引入无意的副作用。在这里可能没什么大不了的,但通常很难调试这种东西。
    • 当然。您必须确切地询问用例是什么,并且我们没有足够的上下文,除非知道 OP 并不真正了解基本的 RX 组合器。
    • @bradgonesurfing - 顺便说一下,Observable.Generate 有一个重载,可以将每次迭代延迟 Timespan 或特定的 DateTime - 它甚至会获取状态,因此您可以在每次迭代时改变它.此外,Observable.Generate 确实在每次迭代时都让给调度程序(每次后续迭代都被调度为前面的完成),因此紧密的循环不会导致调度程序饥饿。
    【解决方案2】:

    我怀疑您的问题与 Throttle 在内部通过 DefaultScheduler.Instance 引入并发性以及您对 IDisposable GeneratePoints(IObserver&lt;double&gt; observer) 的实现是非标准的事实有关。

    尝试像这样重新实现IObservable&lt;double&gt; GeneratePoints()

    private IObservable<double> GeneratePoints()
    {
        return Observable.Generate<int, double>(
            0,
            i => true,
            i => i + 1,
            i => random.Next(0, 100) * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1)))));
    }
    

    这可能会有所帮助。

    问题来自您的 observable 在订阅过程中直接推出值。在创建 observable 时,您应该始终尝试使用标准的内置运算符。上面的代码使用了内置的Generate 操作符,所以它应该和你的代码配合得更好。

    【讨论】:

    • Observable.Generate 似乎是创建我的IObservable 的更好方法。我不喜欢这段代码的“意大利面”(尽管添加新行确实有帮助)。我不会在实际代码中返回随机数 - 这只是一个占位符,让我的预期行为正常工作。
    【解决方案3】:

    Throttle 只会在至少有 500 毫秒的间隙(在您的情况下)时才让值通过。由于GeneratePoints 推送值的速度比这快得多,因此不会发生任何事情。 Sample 可能是您想要的运算符,在这种情况下,它将每 500 毫秒产生一个值。

    Source:       1111111111111111111----------111---111111
    Throttle (5): -----------------------1-----------------
    Sample (5):   ----1----1----1----1---1------------1----1
    

    【讨论】:

    • 不幸的是,Sample 无济于事 - 我得到了相同的结果(未调用观察者),只是将采样间隔降低到 1 毫秒以下根本不会调用观察者。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-22
    • 1970-01-01
    • 1970-01-01
    • 2015-12-13
    • 2015-08-23
    • 1970-01-01
    相关资源
    最近更新 更多