【问题标题】:Reactive Extensions RX sample several observables on interval to get latest valuesReactive Extensions RX 在间隔上采样几个 observables 以获取最新值
【发布时间】:2013-06-12 15:34:19
【问题描述】:

我有一种情况,我需要在控制 observable 定义的时间对多个 observable 进行采样。

我会直接跳到大理石图。

co 下面是控制 observable (Observe.Interval(TimeSpan.FromSeconds(1))

o* 是不同类型的 observables

co tick  ---------x---------x---------x---------x---------x---------x-
o1 int   --2----1-------4--------3--------1-------2----3-------5------
o2 bool  -t-----------f---------t---------------------------f---------

result   ---------Z---------Z---------Z---------Z---------Z---------Z-

我需要开发一种扩展方法,该方法仅在控制通道上有滴答声时对每个 o-observable 的最新值进行采样

Z 将是从 o-observables 传递最新采样值到选择器函数的结果。它有点像 CombineLatest,但不完全是。

即,作为一个非常简化的示例,假设 func 看起来像这样:

 (i, b) => {if (b) return i; else return 0; }

我希望这种情况下的结果是

 result   ---------1---------0---------3---------1---------3---------0-

前 1 是因为 o2 是最后一个为真,而 o1 是最后一个 1 第二个 0 是因为 o2 最后是假的。

请注意,o2 并不总是在每个样本之间产生一个值。我仍然需要获取最后一个值。 (.Sample() 不起作用。)。实际上,涉及的函数和类型更复杂,所以不要因为上面的 int 和 bool 类型而做出假设。

另外,我需要选择器函数在每个刻度上只运行一次。

这是我目前的解决方案,但它不符合上述要求,因为我在 CombineLatest 之后采样。

 var combined = interval.CombineSampled(
            Status,
            H1,
            H2
            M,
            T,
            HMode,
            TMode,
            (i, s, hr1, hr2, m, t, hMode, tMode) =>
            {
               ... (left out)

               return result;
            });

合并采样:

public static IObservable<TResult> CombineSampled<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(this IObservable<TSource1> controllingSource, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> selector)
    {
        return controllingSource.Publish(s => s.CombineLatest(source2, source3, source4, source5, source6, source7, source8, selector).SampleEx(s));
    }


public static IObservable<T> SampleEx<T, S>( this IObservable<T> source, IObservable<S> samples )
    {
        // This is different from the Rx version in that source elements will be repeated, and that
        // we complete when either sequence ends. 
        return Observable.Create( ( IObserver<T> obs ) =>
            {
                object gate = new object();
                bool hasSource = false;
                var value = default(T);

                return new CompositeDisposable(
                    source.Synchronize( gate ).Subscribe( v => { value = v; hasSource = true; }, obs ),
                    samples.Synchronize( gate ).Subscribe( _ => { if ( hasSource ) obs.OnNext( value ); }, obs )
                );
            } );
    }

【问题讨论】:

    标签: system.reactive sample sample-data


    【解决方案1】:

    将您的 CombineSampled 更改为:

    public static IObservable<TResult> CombineSampled<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(this IObservable<TSource1> controllingSource, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> selector)
    {
        return controllingSource.Publish(s => s
            .CombineLatest(source2, source3, source4, source5, source6, source7, source8,
               Tuple.Create<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8>)
            .SampleEx(s)
            .Select(t => selector(t.Item1, t.Item2, t.Item3, t.Item4, t.Item5, t.Item6, t.Item7, t.Item8));
    }
    

    【讨论】:

    • 谢谢。按预期工作! (t.Item8 应该是 t.Rest.Item1)
    • 酷。我想你可以使用匿名类型而不是元组:(a, b, ... f, g) =&gt; new { a, b, ..., f, g }
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-05-03
    • 1970-01-01
    • 2012-03-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多