【问题标题】:Rx IObservable buffering to smooth out bursts of eventsRx IObservable 缓冲以平滑突发事件
【发布时间】:2010-12-22 01:45:44
【问题描述】:

我有一个 Observable 序列,它以快速突发的方式产生事件(即:五个事件一个接一个,然后是长时间的延迟,然后是另一个快速突发事件,等等)。我想通过在事件之间插入短暂的延迟来消除这些突发。以下图为例:

原始:--oooo--------------ooooo-----oo----ooo| 缓冲:--o--o--o--o--------o--o--o--o--o--o--o---------- o--o--o|

我目前的方法是通过Observable.Interval() 生成一个类似节拍器的计时器,它会在可以从原始流中提取另一个事件时发出信号。问题是我不知道如何将该计时器与我的原始无缓冲可观察序列结合起来。

IObservable.Zip() 接近于做我想做的事,但它只在原始流比计时器更快地产生事件时才有效。一旦原始流中出现明显的停顿,计时器就会建立一系列不需要的事件,然后立即与原始流中的下一个突发事件配对。

理想情况下,我想要一个具有以下函数签名的 IObservable 扩展方法,它可以产生我上面概述的行为。现在,来拯救我的 StackOverflow :)

public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)

PS。我是 Rx 的新手,所以如果这是一个非常简单的问题,我深表歉意......


1。简单但有缺陷的方法

这是我最初的天真和简单的解决方案,有很多问题:

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    Queue<T> q = new Queue<T>();
    source.Subscribe(x => q.Enqueue(x));
    return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue());
}

第一个明显的问题是内部订阅返回的 IDisposable 丢失了原始源,因此无法终止订阅。在此方法返回的 IDisposable 上调用 Dispose 会终止计时器,但不会终止底层原始事件源,它现在不必要地填充队列,没有人可以从队列中提取事件。

第二个问题是异常或流结束通知无法从原始事件流传播到缓冲流 - 在订阅原始源时它们会被简单地忽略。

最后但并非最不重要的一点是,现在我的代码会定期唤醒,无论是否实际上有任何工作要做,我宁愿在这个美妙的新反应式世界中避免这种情况。


2。方法过于复杂

为了解决我最初的简单方法中遇到的问题,我编写了一个 much 更复杂的函数,其行为类似于 IObservable.Delay()(我使用 .NET Reflector 来读取该代码并将其用作我的功能的基础)。不幸的是,AnonymousObservable 等许多样板逻辑在 system.reactive 代码之外无法公开访问,因此我不得不复制并粘贴 很多 代码。此解决方案似乎可行,但鉴于其复杂性,我不太相信它没有错误。

我简直不敢相信没有办法使用标准响应式扩展的某种组合来实现这一点。我讨厌我在不必要地重新发明轮子的感觉,而我正在尝试构建的模式似乎是一个相当标准的模式。

【问题讨论】:

  • 您没有指定语言。如果你把一个放在标签里,熟悉它的人就会看到它。但我的第一个想法是在收到事件时将事件添加到堆栈中(无缓冲),然后在计时器上弹出。
  • @Robert,与其说是栈不如说是队列吧?
  • @spender - 当然。显然我需要睡觉了。
  • 是的,这正是我的想法,但是让我难以理解的部分是如何使用 Rx 框架中提供的标准 IObservable 方法来完成它。我在概述我最初的简单化(和有缺陷)方法的问题中添加了更多细节。真正的困难在于弄清楚如何将将原始事件推送到队列中的逻辑以及在常规计时器上从队列中拉出的逻辑封装到单个 IObservable 中。

标签: c# buffering system.reactive


【解决方案1】:

这实际上是 A way to push buffered events in even intervals 的副本,但我将在此处包含一个摘要(原件看起来很混乱,因为它考虑了一些替代方案)。

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Drain(x => 
        Observable.Empty<int>()
            .Delay(minDelay)
            .StartWith(x)
    );
}

我的 Drain 实现类似于 SelectMany,除了它等待前一个输出首先完成(您可以将其视为 ConactMany,而 SelectMany 更像 MergeMany)。内置的Drain 不能以这种方式工作,因此您需要包含以下实现:

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(
        this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

【讨论】:

  • 啊哈!我知道必须有一种相对直接的方式来通过现有运营商的组合来做到这一点。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多