【问题标题】:Signal Filter with Rx.NET带有 Rx.NET 的信号滤波器
【发布时间】:2015-11-03 16:16:48
【问题描述】:

我想在 Rx.NET 中实现一个信号滤波器,它从一组初始系数开始。随着时间的推移,滤波器系数必须从观测数据值的快照中重新计算。

这是一个小原型,展示了它应该如何工作。为简单起见,我选择滤波器长度和用于重新计算滤波器系数的历史数据值的数量相同(示例中为 3)。

该示例使用bufferedAt10 中的副作用来重新计算系数。这不是我想要的。

在实际应用中,数据以不规则的时间步长出现,系数应每天或每周在特定时间更新一次。我可以轻松地使缓冲区更长,但是如何让系统运行并以一种干净的功能方式从观察者那里更改滤波器系数?

       // create a hot obvervable to produce data
        const int bufLen = 3;
        var rng = new Random();
        var period = TimeSpan.FromSeconds(0.5);
        var observable = Observable.Interval(period)
        .Select(i => new {Time = DateTime.Now, Price = rng.NextDouble()})
        .Do(e => Console.WriteLine("original : {0}", e))
        .Publish();
        observable.Connect();

        Console.WriteLine("Press any key to subscribe");
        Console.ReadKey();

        // buffer of length bufLen used for filter calculation (every tick) and filter 
        // coefficient update (at a lower frequency)
        var buffered = observable.Buffer(bufLen, 1);

        // apply the signal filter with coefficients in `coeff`
        var coeff = new List<Double>() {1.0, 1.0, 1.0};  // these will be updated on the way from new data 
        var filtered = buffered.Select(e =>
        {
            var f = 0.0;
            for (var i = 0; i < bufLen; i++)
            {
                f += e[i].Price*coeff[i]; // apply the filter with coefficients `coeff`
            }
            return new {Time = DateTime.Now, FilteredPrice = f};
        });

        var s1 = filtered.Subscribe(e => Console.WriteLine("filtered : {0} (coeff {1},{2},{3})", e, coeff[0], coeff[1], coeff[2]));

        // recalculate the filter coefficients say every 10 seconds 
        var bufferedAt10 = buffered.DistinctUntilChanged(e => (e[bufLen - 1].Time.TimeOfDay.Seconds / 10) * 10);

        var s2 = bufferedAt10.Subscribe(e =>
        {
            Console.WriteLine("recalc of coeff : {0}", e[bufLen - 1].Time);
            for (var i = 0; i < bufLen; i++)
            {
                // a prototypical function that takes the buffer and uses it to "recalibrate" the filter coefficients
                coeff[i] = coeff[i] + e[bufLen - 1 - i].Price;
            }
            Console.WriteLine("updated coeffs to {0},{1},{2}", coeff[0], coeff[1], coeff[2]);
        });

感谢您提供任何好的建议。

【问题讨论】:

    标签: c# system.reactive reactive-programming


    【解决方案1】:

    以下内容未经测试,但我认为它应该涵盖您需要的内容。其背后的想法是,您将流分开,对一个进行系数更新,然后将它们与WithLatestFrom 重新组合在一起。我使用SampleScan 执行期间“调整”。您的自定义时间戳可以通过使用 TimeStamp 运算符来完成。您也可以考虑将Publish 向下移动到Buffer 之外,否则您将有两个流生成缓冲区,但这取决于您。

    const int bufLen = 3;
    var rng = new Random();
    var period = TimeSpan.FromSeconds(0.5);
    var observable = Observable.Interval(period)
      .Select( => rng.NextDouble())
      .Publish();
    observable.Connect();
    
    Console.WriteLine("Press any key to subscribe");
    Console.ReadKey();
    
    var buffered = observable.Buffer(bufLen, 1);
    
    var seed = new [] {1.0, 1.0, 1.0};
    
    var coefficients = buffered
                         //Samples for a new value every 10 seconds
                         .Sample(TimeSpan.FromSeconds(10))
                         //Updates the seed value and emits it after every update
                         .Scan(seed, 
                           //Use good old fashion Linq
                           (coeff, delta) => coeff.Zip(delta.Reverse(), 
                                             (c, d) => c + d.Price)
                                             .ToArray()
                           );
    
    //Emits a new value everytime buffer emits, and combines it with the latest
    //values from the coefficients Observable
    //Kick off coefficients with the seed otherwise you need to wait 10 seconds 
    //for the first value.
    buffer.WithLatestFrom(coefficients.StartWith(seed), (e, coeff) => {
      return e.Zip(coeff, (x, c) => x.Price * c).Sum();
    })
    .TimeStamp()
    .Subscribe(e => Console.WriteLine("filtered : {0}", e);
    

    【讨论】:

    • 非常感谢,我明白了。我猜WithLatestFrom 应该是CombineLatest
    • @Daniel CombineLatest 将在源 Observable 发出 either 时触发。 WithLatestFrom 只会在第一个源发射时触发。它可能仅在预发布版本 2.3.0-beta 中可用
    猜你喜欢
    • 2014-04-24
    • 1970-01-01
    • 2011-06-02
    • 2015-10-26
    • 2018-11-28
    • 2011-10-26
    • 2023-03-12
    • 2020-09-16
    • 2023-04-10
    相关资源
    最近更新 更多