【问题标题】:"Buffer until quiet" behavior from Reactive?Reactive的“缓冲直到安静”行为?
【发布时间】:2016-02-22 15:22:37
【问题描述】:

我的问题有点像 Nagle algorithm 旨在解决的问题,但不完全一样。我想要将来自IObservable<T>OnNext 通知缓冲到IObservable<IList<T>>s 序列中,如下所示:

  1. 当第一个T 通知到达时,将其添加到缓冲区并开始倒计时
  2. 如果另一个T 通知在倒计时到期之前到达,请将其添加到缓冲区并重新开始倒计时
  3. 一旦倒计时结束(即生产者已经静默一段时间),将所有缓冲的 T 通知作为单个聚合 IList<T> 通知转发。
  4. 如果在倒计时结束前缓冲区大小超过某个最大值,请无论如何发送。

IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler) 看起来很有希望,但它似乎会定期发送汇总通知,而不是“在第一个通知到达时启动计时器并在其他通知到达时重新启动它”我想要的行为,而且它也如果没有从下方生成通知,则在每个时间窗口结束时发送一个空列表。

我确实想要删除任何T 通知;只是缓冲它们。

这样的东西存在吗,还是我需要自己写?

【问题讨论】:

标签: c# system.reactive reactive-programming


【解决方案1】:

SO 上存在一些类似的问题,但不完全是这样。 这是一个可以解决问题的扩展方法。

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
                                          (this IObservable<TSource> source,
                                           int maxAmount, TimeSpan threshold)
{
    return Observable.Create<IList<TSource>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}

【讨论】:

  • 这似乎有效。你能解释一下它在做什么吗?我之前没遇到过GroupByUntil
  • 当然。 GroupByUntil 对传入值进行分组,直到作为第二个参数提供的 Observable 提供了一个值。在这种情况下,我们将所有通知分组到同一个组并等待来自ThrottleBuffer 的值(因此是Merge)。前者保证静音阈值,后者保证最大限制。
  • 不错的解决方案。我提出了一个轻微的变化,而不是使用 Buffer,在大缓冲区或繁忙流的情况下使用更少的内存
【解决方案2】:

有趣的运算符。 Supertopi 的答案是一个很好的答案,但是可以进行改进。如果maxAmount 很大,并且/或者通知率很高,那么使用Buffer 将通过分配很快被丢弃的缓冲区来烧毁GC。

为了在达到maxAmount 后关闭每个GroupBy Observable,您无需捕获所有这些元素的Buffer 即可知道它何时已满。根据 Supertopi 的回答,您可以将其稍微更改为以下内容。它不会收集maxAmount 元素中的Buffer,而是在看到流中的maxAmount 元素后发出信号。

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source, int maxAmount, TimeSpan threshold)
{
    return Observable.Create<IList<TSource>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge(g.Take(maxAmount)
                                                 .LastAsync()
                                                 .Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}

【讨论】:

    【解决方案3】:

    不错的解决方案。在我看来,使用现有运算符创建行为只是为了方便而不是为了性能。

    此外,我们应该始终返回 IEnumerable 而不是 IList。 返回最小派生类型 (IEnumerable) 将为您留出最大的余地来更改底层实现。

    这是我实现自定义运算符的版本。

    public static IObservable<IEnumerable<TValue>> BufferWithThrottle<TValue>(this IObservable<TValue> @this, int maxAmount, TimeSpan threshold)
        {
            var buffer = new List<TValue>();
    
            return Observable.Create<IEnumerable<TValue>>(observer =>
            {
                var aTimer = new Timer();
                void Clear()
                {
                    aTimer.Stop();
                    buffer.Clear();
                }
                void OnNext()
                {
                    observer.OnNext(buffer);
                    Clear();
                }
                aTimer.Interval = threshold.TotalMilliseconds;
                aTimer.Enabled = true;
                aTimer.Elapsed += (sender, args) => OnNext();
                var subscription = @this.Subscribe(value =>
                {
                    buffer.Add(value);
                    if (buffer.Count >= maxAmount)
                        OnNext();
                    else
                    {
                        aTimer.Stop();
                        aTimer.Start();
                    }
                });
                return Disposable.Create(() =>
                {
                    Clear();
                    subscription.Dispose();
                });
            });
        }
    

    通过对比其他解决方案的性能测试,它可以节省高达 30% 的 CPU 功率并解决内存问题。

    【讨论】:

    • 我觉得你的回答不完整?
    • @MohammadKanan 是的,因为我没能在我的第一篇文章中让它变得丰富多彩。所以我一直在编辑我的答案。现在应该没问题了。
    • 组合现有运算符而不是滚动自己的一个好处是它使您对结果充满信心。您编写的代码 看起来 可以工作,但我想在生产环境中使用它之前进行大量测试(在我问这个问题以来,我对这个操作员的需求已经消失了)。仍然值得一票
    • 我还想将类型从 IList 更改为 IEnumerable。是的,它使您可以灵活地在以后交换不同的东西,但是如果调用者(有意或无意地)依赖于您返回特定类型,这样做可能会触发调用者的错误。例如,如果您在之前分发 List 时开始返回一个惰性求值的 IEnumerable。当然,这是他们自己的错,但总的来说,我希望尽可能地避免滥用我的 API。您的里程可能会有所不同,但我更愿意对我要返回的内容进行相当具体的说明。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-07
    • 2011-10-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多