【问题标题】:reactive extensions sliding time window反应式扩展滑动时间窗口
【发布时间】:2017-09-08 07:05:40
【问题描述】:

我有一系列股票报价,我想获取过去一小时内的所有数据并对其进行一些处理。我正在尝试通过响应式扩展 2.0 来实现这一点。我在另一篇文章中阅读了使用间隔,但我认为已弃用。

【问题讨论】:

  • 您是想在每次有新值出现时都获得最后一小时的值,还是只想每小时获得一个小时的股票价格?
  • 每次有新值进入时,我都想要最后一小时的值。我已经查看了 Buffer,但我认为它不是正确的。

标签: c# system.reactive


【解决方案1】:

这种扩展方法能解决你的问题吗?

public static IObservable<T[]> RollingBuffer<T>(
    this IObservable<T> @this,
    TimeSpan buffering)
{
    return Observable.Create<T[]>(o =>
    {
        var list = new LinkedList<Timestamped<T>>();
        return @this.Timestamp().Subscribe(tx =>
        {
            list.AddLast(tx);
            while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
            {
                list.RemoveFirst();
            }
            o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
        }, ex => o.OnError(ex), () => o.OnCompleted());
    });
}

【讨论】:

    【解决方案2】:

    您正在寻找窗口运算符! 这是我写的关于使用重合序列(序列的重叠窗口)的长篇文章 http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

    所以如果你想建立一个滚动平均值,你可以使用这种代码

    var scheduler = new TestScheduler();
    var notifications = new Recorded<Notification<double>>[30];
    for (int i = 0; i < notifications.Length; i++)
    {
      notifications[i] = new Recorded<Notification<double>>(i*1000000, Notification.CreateOnNext<double>(i));
    }
    //Push values into an observable sequence 0.1 seconds apart with values from 0 to 30
    var source = scheduler.CreateHotObservable(notifications);
    
    source.GroupJoin(
          source,   //Take values from myself
          _=>Observable.Return(0, scheduler), //Just the first value
          _=>Observable.Timer(TimeSpan.FromSeconds(1), scheduler),//Window period, change to 1hour
          (lhs, rhs)=>rhs.Sum())    //Aggregation you want to do.
        .Subscribe(i=>Console.WriteLine (i));
    scheduler.Start();
    

    我们可以看到它在接收值时输出滚动和。

    0、1、3、6、10、15、21、28...

    【讨论】:

    • 我无法让这段代码工作。 rhs.Sum() 产生 IObservable&lt;double&gt; 而不是 double,所以我假设 .Concat() 也需要?但即便如此,也只会输出零
    • 抱歉,我已经更正了答案。准确地说,使用TestScheduler 时,Observable.Return(0) 也需要传入调度程序,即 Observable.Return(0, scheduler)。也许更有意义的值是 Observable.Return(Unit.Default, scheduler)` 或 Observable.Empty&lt;Unit&gt;(scheduler)
    • @LeeCampbell 我在使用您的解决方案来满足我有点不同的需求时遇到了麻烦。我需要得到两个值(最小值和最大值),而不仅仅是一个。我尝试了两种方法 - 首先在聚合步骤输出一个 (rhs.MinBy(...), rhs.MaxBy(...)) 的元组,然后在 .Subscribe(diff =&gt; Math.Avg(diff.Item1, diff.Item2) 中使用它(只是一个例子)。这不起作用,因为编译器无法将 diff 参数转换为双精度/十进制。然后我尝试只输出(_, rhs) =&gt; rhs.Subscribe(...) 并计算其中的最小值/最大值,但没有用,使用 .Scan 也没有。你看看有什么可以做的吗?
    • @LeeCampbell 也许更好地说,我不知道如何从流中获取值以执行计算 - 即从System.IObservable&lt;System.Collections.Generic.IList&lt;System.Reactive.EventPattern&lt;...EventArgs&gt;&gt;&gt;“转换”为十进制/双精度/我得到的任何其他对象从流中。我不想求助于使用 .Wait()、.Last() 或其他阻塞结构。非常感谢您的意见,顺便说一句,我喜欢您的 introtorx 网站。
    • @pun11 为您正在努力通过的单元测试创​​建一个新帖子。在帖子中抄送我,但我敢打赌社区会比我更快地回复。
    【解决方案3】:

    Buffer 很可能就是您要查找的内容:

    var hourlyBatch = ticks.Buffer(TimeSpan.FromHours(1));
    

    【讨论】:

    • 我认为这将产生 1 小时数据的非重叠窗口,而我基本上希望每次有新值进入时都有一个滑动窗口。
    • 啊...那只是 ticks.Window(TimeSpan.FromHouse(1)) 那么。
    • 这将提供 1 小时的窗口,但不会重叠窗口。您需要提供窗口打开和关闭 observables,或跳过计数/次。
    【解决方案4】:

    或者假设数据已经是Timestamped,只需使用Scan

        public static IObservable<IReadOnlyList<Timestamped<T>>> SlidingWindow<T>(this IObservable<Timestamped<T>> self, TimeSpan length)
        {
            return self.Scan(new LinkedList<Timestamped<T>>(),
                             (ll, newSample) =>
                             {
                                 ll.AddLast(newSample);
                                 var oldest = newSample.Timestamp - length;
                                 while (ll.Count > 0 && list.First.Value.Timestamp < oldest)
                                     list.RemoveFirst();
    
                                 return list;
                             }).Select(l => l.ToList().AsReadOnly());
        }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-01-17
      • 1970-01-01
      • 1970-01-01
      • 2016-08-01
      • 2017-07-01
      • 2014-01-22
      • 1970-01-01
      • 2020-09-11
      相关资源
      最近更新 更多