【问题标题】:Rx.NET How do I buffer stream data as a moving (sliding) window without delay?Rx.NET 如何无延迟地将流数据缓冲为移动(滑动)窗口?
【发布时间】:2022-08-18 06:31:45
【问题描述】:

我正在使用 Rx.NET 库连接到交易数据,我想做的是连续缓冲最后 100 秒的数据并每 2 秒分析一次。 我正在使用以下 Buffer 方法重载:

        tradeStream
            .Buffer(TimeSpan.FromSeconds(100), TimeSpan.FromSeconds(2))
            .Subscribe(data =>
            {
                //...
            });

现在的问题是它作为 100 秒缓冲区工作 + 它等待 2 秒。 有没有办法以一定的间隔立即拍摄最后 100 秒的“快照”?

    标签: c# reactive-programming system.reactive sliding-window rx.net


    【解决方案1】:

    听起来您需要内置运算符Window。有一个重载需要两个TimeSpan

    IObservable<IObservable<Trade>> query =
        tradeStream
            .Window(TimeSpan.FromSeconds(100.0), TimeSpan.FromSeconds(2.0));
    

    这样就开始了一个新的 observable,它每 2 秒持续 100 秒。

    【讨论】:

    • 好的!您可能应该在Window 之后添加.SelectMany(w =&gt; w.ToList()),以生成预期的结果序列。这种方法的一个缺点可能是增加了内存消耗,因为每个缓冲项同时存储在多个列表中。
    • 当然需要某种.SelectManyFunc&lt;IObservable&lt;Trade&gt;, IObservable&lt;?&gt;&gt; 来处理单独的流。
    【解决方案2】:

    您可以考虑使用Replay 运算符将数据缓冲指定的持续时间,并使用Observable.Interval 方法在每个间隔发出缓冲数据。要完成生成的序列,您可以使用 TakeUntil 运算符。下面是一个可能的实现。

    /// <summary>
    /// Emits a list every interval that contains all the currently buffered elements.
    /// </summary>
    public static IObservable<IList<TSource>> BufferHistorical<TSource>(
        this IObservable<TSource> source,
        TimeSpan interval, TimeSpan replayDuration)
    {
        ArgumentNullException.ThrowIfNull(source);
        if (interval < TimeSpan.Zero)
            throw new ArgumentOutOfRangeException(nameof(interval));
        if (replayDuration < TimeSpan.Zero)
            throw new ArgumentOutOfRangeException(nameof(replayDuration));
    
        return source.Replay(replayed =>
        {
            return Observable
                .Interval(interval)
                .SelectMany(_ => replayed
                    .TakeUntil(Observable.Return(Unit.Default, Scheduler.CurrentThread))
                    .ToList())
                .TakeUntil(replayed.LastOrDefaultAsync());
        }, replayDuration, Scheduler.Immediate);
    }
    

    使用示例:

    tradeStream
        .BufferHistorical(TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(100))
        .Subscribe(data =>
        {
            //...
        });
    

    【讨论】:

    • 谢谢你!也会检查这个解决方案!
    • @AntonBocharov 请注意,使用此解决方案,您将在 2 秒后获得第一个列表,再过 2 秒后获得下一个列表等。您无需等待 100 秒即可获得第一个列表。
    【解决方案3】:

    好吧,我必须承认我的结论是错误的,认为这是一个 buffer() 扩展问题。

    缓冲区发射后发生延迟的原因是观察者内部的一项长时间运行的任务。我找到了一个符合我期望的解决方案。我只需要收集一些数据并调用一个方法在异步线程中处理它。 即使处理时间超过缓冲间隔,顺序仍然是一致的。

    var tlist = Observable.Range(1, 100)
    .Zip(Observable.Interval(TimeSpan.FromMilliseconds(1000)), (i, t) => i);
    
    var list = tlist.Publish();
    
    
    list.Subscribe(b => {
    Console.WriteLine(b);
    });
    
    list
    .Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
    .SelectMany(async b =>
    {
        Console.WriteLine(string.Join("_", b));
        await Task.Delay(4000);
        return Task.FromResult(0);
    })
    .Subscribe();
    
    list.Connect();
    

    【讨论】:

      猜你喜欢
      • 2018-03-03
      • 1970-01-01
      • 2015-07-12
      • 1970-01-01
      • 2019-03-04
      • 2011-12-16
      • 2016-03-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多