【问题标题】:What is the best way to "rate limit" consuming of an Observable?“速率限制”消耗 Observable 的最佳方法是什么?
【发布时间】:2012-07-01 17:44:45
【问题描述】:

我有一堆事件进来,我必须毫无损失地执行所有这些事件,但我想确保它们在适当的时间段被缓冲和消耗。谁有解决办法?

我在 Rx 中找不到任何可以在不丢失事件的情况下做到这一点的操作符(Throttle - 丢失事件)。我也考虑过Buffered、Delay等……找不到好的解决方案。

我试图在中间放一个计时器,但不知何故它根本不起作用:

GetInitSequence()
            .IntervalThrottle(TimeSpan.FromSeconds(5))
            .Subscribe(
                item =>
                    {
                        Console.WriteLine(DateTime.Now);
                        // Process item
                    }
            );

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
    {
        return Observable.Create<T>(o =>
            {
                return source.Subscribe(x =>
                    {
                        new Timer(state => 
                            o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1));
                    }, o.OnError, o.OnCompleted);
        });
    }

【问题讨论】:

  • 你可以添加一个大理石图表来显示你拥有什么,你想要什么?和其他人一样,我不确定您要达到什么目标,因为我认为 Buffer 就可以了。
  • 你有什么限制?

标签: system.reactive


【解决方案1】:

这个问题不是 100% 清楚的,所以我在做一些假设。

Observable.Delay 不是您想要的,因为这会在每个事件到达时产生延迟,而不是为处理创建均匀的时间间隔。

Observable.Buffer 不是您想要的,因为这会导致每个给定时间间隔内的所有事件都传递给您,而不是一次一个。

因此,我相信您正在寻找一种解决方案,该解决方案可以创建某种 节拍器,它会滴答作响,并在每次滴答声中为您提供一个事件。这可以简单地使用Observable.Interval 构建节拍器,Zip 将其连接到您的源:

var source = GetInitSequence();
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));    
var triggeredSource = source.Zip(trigger, (s,_) => s); 
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now));

这将每 5 秒触发一次(在上面的示例中),并按顺序为您提供原始项目。

此解决方案的唯一问题是,如果您在(例如)10 秒内没有更多源元素,当源元素到达时,它们将立即被发送出去,因为一些“触发”事件就在那里等着他们。该场景的大理石图:

source:  -a-b-c----------------------d-e-f-g
trigger: ----o----o----o----o----o----o----o
result:  ----a----b----c-------------d-e-f-g

这是一个非常合理的问题。这里已经有两个问题可以解决:

Rx IObservable buffering to smooth out bursts of events

A way to push buffered events in even intervals

提供的解决方案是主要的Drain 扩展方法和次要的Buffered 扩展。我已经将这些修改得更简单(不需要Drain,只需使用Concat)。用法是:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5));

扩展方法StepInterval

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}

【讨论】:

  • 谢谢。仍然不是我真正想要的,但它向我展示了一些想法。我只是对 Rx 感到沮丧——为什么它应该如此复杂并且没有适当的文档。学习曲线陡峭,需要对该主题有广泛的了解才能获得有价值的东西。 #失败
  • 同意。这就是为什么我花了很多时间编写 IntroToRx.com 来帮助处于你位置的人。这很难,还有很多东西要学。
  • 我真的觉得这些 Rx 运算符很难阅读和推理。我认为这是我的局限——这可能是因为我有一个视觉思维,我无法想象结果。是否有机会获得此答案中代码的作用的大理石图?
  • 2018年遇到同样问题的人,RXJS debounceTime正是我要找的:medium.com/aviabird/…
  • 感谢您的精彩回答。我从中得到了一些想法,但我不确定如何将其应用于 RxJava。你能指出我正确的方向吗? RxJava 中没有 DrainSelect
【解决方案2】:

我知道这可能太简单了,但这会奏效吗?

var intervaled = source.Do(x => { Thread.Sleep(100); });

基本上,这只是在值之间设置了最小延迟。太简单了?

【讨论】:

  • 这匹配并修复了 OP 的行为 IntervalThrottle 这真的明智吗?
  • Eeek...阻塞线程!?面对 Rx 负责人,那种苍蝇对吗?
  • 是的,从最纯粹的意义上说,这是针对 Rx 的,但要求是阻止
【解决方案3】:

按照 Enigmativity 的回答,如果您只想将所有值延迟一个 TimeSpan,我不明白为什么 Delay 不是您想要的运算符

  GetInitSequence()
        .Delay(TimeSpan.FromSeconds(5)) //ideally pass an IScheduler here
        .Subscribe(
            item =>
                {
                    Console.WriteLine(DateTime.Now);
                    // Process item
                }
        );

【讨论】:

    【解决方案4】:

    Observable.Buffer 怎么样?这应该将 1s 窗口中的所有事件作为单个事件返回。

    var xs = Observable.Interval(TimeSpan.FromMilliseconds(100));
    var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(5));
    bufferdStream.Subscribe(item => { Console.WriteLine("Number of events in window: {0}", item.Count); });
    

    这可能是你问的不是很清楚。你的代码应该做什么?看起来你只是通过为每个事件创建一个计时器来延迟。它还破坏了 observable 的语义,因为 next 和 complete 可能发生在 next 之前。

    请注意,这也仅在所使用的计时器上同样准确。通常,计时器的精度最高可达 16 毫秒。

    编辑:

    你的例子变成了,并且 item 包含了窗口中的所有事件:

    GetInitSequence()
                .Buffer(TimeSpan.FromSeconds(5))
                .Subscribe(
                    item =>
                        {
                            Console.WriteLine(DateTime.Now);
                            // Process item
                        }
                );
    

    【讨论】:

      猜你喜欢
      • 2010-11-29
      • 1970-01-01
      • 2010-10-14
      • 2014-05-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多