【问题标题】:Observable throttling可观察到的节流
【发布时间】:2016-10-07 08:56:15
【问题描述】:

我正在尝试使用响应式扩展来实现事件限制。 我有一个系统,其中可能会针对特定用户或其他实体类型以高频率引发事件。 我需要将事件延迟特定的时间量,一旦超时到期,就会使用最后一个值引发事件。

我做的是这个

 private Subject<int> userBalanceObservable = new Subject<int>();
 userBalanceObservable.Sample(TimeSpan.FromSeconds(sampleSeconds))
            .Subscribe(sample => OnRaiseBalanceEvent(sample));

当事件发生时

userBalanceObservable.OnNext(userId);

编辑

这种方法的问题是事件是为传递给 OnNext 的最后一个值引发的,我真正需要的是为传递给 OnNext 的每个值设置一个延迟。

例如 OnNext(1),OnNext(2),OnNext(3) 我需要延迟调用 1,2,3 而我只得到最后一个值 3。

【问题讨论】:

  • 下次尝试发布最低完整可验证示例 (stackoverflow.com/help/mcve),以便我们确切知道您要实现的目标。理想情况下是单元测试。
  • 您确实需要向我们展示所有代码 - 特别是 OnRaiseBalanceEvent 中发生了什么以及 userBalanceObservable 如何获取其值。不过,作为一个小提示,如果您使用的是 Subject,您可能做错了什么。

标签: c# system.reactive


【解决方案1】:

每次达到采样间隔时,Sample 都会发布最后一个值。如果这是您需要的行为,那很好。看看http://www.introtorx.com/content/v1.0.10621.0/13_TimeShiftedSequences.html#Sample 了解更多信息+其他减缓排放速度的方法。

关于您的问题的新信息:

如果您想在达到某个超时后发出所有值,您可以将这些值分组,直到超时为止(注意:如果您继续添加事件而没有超时,您可能会因为频率而耗尽内存事件排放量)

您可以创建一个缓冲区,直到达到Debounce 超时,请参阅 SO 上的此答案以获取指针:How to implement buffering with timeout in RX

【讨论】:

  • 实际上它似乎并没有像我预期的那样工作。我需要的是为每个特定项目而不是任何项目延迟样品。例如 OnNext(1),OnNext(2),OnNext(3) 我希望延迟调用 1,2,3 而我只得到最后一个值 3。反应式扩展是否支持这样的功能?跨度>
  • @NullReference - 这不是您在问题中要求的。你能更新一下问题吗?
【解决方案2】:

buffering 不工作吗?唯一的“问题”是 OnRaiseBalanceEvent 必须使用 list 而不是一个值,但计算机科学中的所有问题都可以通过另一个级别的间接来解决;)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Reactive.Subjects;
using System.Reactive.Linq;

namespace ConsoleApplication1
{
  class Program
  {
    static void Main(string[] args)
    {

      Subject<int> userBalanceObservable = new Subject<int>();
    userBalanceObservable.Buffer(TimeSpan.FromSeconds(2)) //get List of items
                     .Subscribe(sampleList => ProcessSamples(sampleList));

      int cont = 0;

      while (!Console.KeyAvailable)
        {
        userBalanceObservable.OnNext(cont);
        cont++;
        userBalanceObservable.OnNext(cont);
        cont++;
        Thread.Sleep(1000);
      }

    }

    private static void ProcessSamples(IList<int> sampleList)
    {
      Console.WriteLine("[{0}]", string.Join(", ", sampleList.ToArray()));
    }

  }
}

【讨论】:

  • 嗯,我认为缓冲区可以完成这项工作。我可以使用 .Where(x=>x.Count >0) 使空序列静音,并使用 Distinct() 仅获取一个值实例。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-07-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多