【问题标题】:merge multiple observables to an observable array将多个 observable 合并到一个 observable 数组
【发布时间】:2011-10-29 21:59:31
【问题描述】:

您好,我正在尝试将多个可观察对象合并到可观察数组中。这是一个适用于 fsi 的示例。 (抱歉,篇幅较长)

#r "./bin/Debug/System.Reactive.dll"

open System
open System.Reactive.Linq

/// Subscribes to the Observable with all 3 callbacks.
let subscribeComplete next error completed (observable: IObservable<'T>) = 
    observable.Subscribe(
        (fun x -> next x),
        (fun e -> error e),
        (fun () -> completed()))

/// Subscribes to the Observable with a next and an error-function.
let subscribeWithError next error observable = 
    subscribeComplete next error (fun () -> ()) observable

/// Subscribes to the Observable with a next-function
let subscribe (next: 'T -> unit) (observable: IObservable<'T>) : IDisposable = 
    subscribeWithError next ignore observable

/// Static method to generate observable from input functions
let ObsGenerate (initState: 'TS) (termCond: 'TS -> bool) (iterStep: 'TS -> 'TS)
        (resSelect: 'TS -> 'TR) (timeSelect : 'TS -> System.TimeSpan) =
            Observable.Generate(initState, termCond, iterStep, resSelect, timeSelect)

//maps the given observable with the given function
let obsMap (f: 'T -> 'U) (observable : IObservable<'T>) : IObservable<'U> =
    Observable.Select(observable, Func<_,_>(f))

/// Merges two observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest (obs1: IObservable<'T>) (obs2: IObservable<'U>) : IObservable<'T * 'U> = 
    Observable.CombineLatest(
        obs1, obs2, Func<_,_,_>(fun a b -> a, b))    

/// Merges three observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest3 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) : IObservable<'T * 'U * 'V> = 
    let obs12 =obs1.CombineLatest(obs2, Func<_,_,_>(fun a b -> a, b))    
    obs12.CombineLatest(obs3, Func<_,_,_>(fun (a,b) c -> a, b, c))    

/// Merges four observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest4 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) (obs4: IObservable<'W>) : IObservable<'T * 'U * 'V * 'W> = 
    let obsNew = combineLatest3 obs1 obs2 obs3
    obsNew.CombineLatest(obs4, Func<_,_,_>(fun (a,b,c) d -> a, b, c, d))    

// second section generating arrays
let combineLatestArray (obs1: IObservable<'T>) (obs2: IObservable<'T>) =
    combineLatest obs1 obs2 
    |> obsMap (fun (a, b) -> [a; b] |> List.toArray)

let combineLatest3Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) =
    combineLatest3 obs1 obs2 obs3 
    |> obsMap (fun (a, b, c) -> [a; b; c] |> List.toArray)

let combineLatest4Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) (obs4: IObservable<'T>) =
    combineLatest4 obs1 obs2 obs3 obs4
    |> obsMap (fun (a, b, c, d) -> [a; b; c; d] |> List.toArray)

let combineLatestListToArray (list: IObservable<'T> List) =
    match list.Length with
    | 2 -> combineLatestArray list.[0] list.[1]
    | 3 -> combineLatest3Array list.[0] list.[1] list.[2]
    | 4 -> combineLatest4Array list.[0] list.[1] list.[2] list.[3]
    | _ -> failwith "combine latest on unsupported list size"

type FooType = 
        {   NameVal :   string
            IdVal   :   int
            RetVal  :   float }

        member x.StringKey() =
            x.NameVal.ToString() + ";" + x.IdVal.ToString()


// example code starts here

let rnd = System.Random()

let fooListeners = Collections.Generic.Dictionary()

let AddAFoo (foo : FooType) =
    let fooId = foo.StringKey()

    if fooListeners.ContainsKey(fooId)
        then fooListeners.[fooId]
    else
        let myObs = ObsGenerate {NameVal = foo.NameVal; IdVal = foo.IdVal; RetVal = foo.RetVal} (fun x -> true) (fun x -> {NameVal = (x.NameVal); IdVal = (x.IdVal); RetVal = (x.RetVal + rnd.NextDouble() - 0.5)}) (fun x -> x) (fun x -> System.TimeSpan.FromMilliseconds(rnd.NextDouble() * 2000.0))
        fooListeners.Add(fooId,myObs)
        myObs

let fooInit =   [6..9]
                |> List.map (fun index -> {NameVal = (string index + "st"); IdVal = index; RetVal = (float index + 1.0)})     
                |> List.map (fun foo -> AddAFoo foo)

let fooValuesArray =    fooInit
                        |> List.map(fun x -> (x |> obsMap (fun x -> x.RetVal)))
                        |> combineLatestListToArray

let mySub =
    fooValuesArray
    |> subscribe (fun fooVals -> printfn "fooArray: %A" fooVals)

//execute until here to start example

// execute this last line to unsubscribe
mySub.Dispose()

我对这段代码有两个问题:

  1. 有没有更聪明的方法可以将 observables 合并到数组中? (因为我需要合并更大的数组,它变得非常冗长)

  2. 我想限制更新。我的意思是我希望在(比如说)相同的半秒窗口内发生的所有更新都作为数组上的一次更新来处理。理想情况下,我希望此窗口仅在第一次更新到来时打开,即如果 2 秒内没有更新到达,则有一个更新到达,然后我们等待并包含进一步的更新 0.5 秒,然后触发 observable。我不希望它每 0.5 秒定期发布一次,尽管没有触发任何可观察的。我希望这个描述足够清楚。

更新:我已决定接受 F# 答案之一,但我还没有完成 C# 答案的正义。我希望能够尽快正确地检查它们。

【问题讨论】:

    标签: f# system.reactive


    【解决方案1】:

    这是我能想到的最好的。我一直想解决这个问题。

    public static class Extensions
    {
        public static IObservable<IEnumerable<T>> CombineLatest<T>(this Observable observable, IEnumerable<IObservable<T>> observableCollection)
        {
            return observableCollection.CombineLatest();
        }
    
        public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> observables)
        {
            return observables.Aggregate<IObservable<T>, IObservable<IEnumerable<T>>>
            (
                Observable.Return(Enumerable.Empty<T>()),
                (o, n) => o.CombineLatest
                (
                    n,
                    (list, t) => list.Concat(EnumerableEx.Return(t))
                )
            );
        }
    }
    

    所以一个示例用法是:

    var obs = new List<IObservable<bool>> 
    { 
        Observable.Return(true), 
        Observable.Return(false), 
        Observable.Return(true) 
    };
    
    var result = obs.CombineLatest().Select(list => list.All(x => x));
    result.Subscribe(Console.WriteLine);
    Console.ReadKey();
    

    但是,您必须根据知识进行操作,直到 所有可观察对象都产生值,生成的 IObservable&lt;IEnumerable&lt;T&gt;&gt; 才会触发。这是我在场景中需要的,但可能不适合您的场景。

    我担心的是所有 .Concats 的性能。在扩展方法中处理可变集合可能会更高效。不确定。

    抱歉,我不知道 F#。这些天我会解决它。

    在您获得最终的 observable 后,只需使用 .Throttle 运算符完成节流。

    编辑:这个答案是对Enigmativity's recursive Yang的迭代。

    【讨论】:

      【解决方案2】:

      尽管 Gideon Engelberth 已经用解决问题的一种可能方法回答了您的问题。其他可能的方式可能如下所示,它不使用嵌套。

      let combineLatestToArray (list : IObservable<'T> list) = 
          let s = new Subject<'T array>()
          let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
          let cb (i:int,v:'T) = 
              arr.[i] <- v
              s.OnNext(arr |> Array.toList |> List.toArray)
          let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
                     |> Observable.Merge
          main.Subscribe(new Action<int * 'T>(cb)
                         ,new Action<exn>(fun e -> s.OnError(e)) 
                         ,new Action(fun () -> s.OnCompleted()) ) |> ignore
          s :> IObservable<'T array>
      

      让我知道这是否解决了您的问题,因为我没有进行太多测试:) 注意:这是第一部分,第二部分每个人都已经提到了你需要做的事情

      更新: 另一种实现:

      let combineLatestToArray (list : IObservable<'T> list) = 
          let s = new Subject<'T array>()
          let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
          let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
                     |> Observable.Merge
          async {
              try
                  let se = main.ToEnumerable() |> Seq.scan (fun ar (i,t) -> Array.set ar i t; ar) arr
                  for i in se do
                      s.OnNext(i |> Array.toList |> List.toArray)
                  s.OnCompleted()
              with
              | :? Exception as ex -> s.OnError(ex)
          } |> Async.Start
          s :> IObservable<'T array>
      

      【讨论】:

      • 实现一和二的主要区别是什么? (对不起,我还在学习异步编程)
      • 这个解决方案在所有 observables 初始化之前返回值(这不一定是 showstopper
      • 在性能方面,它比嵌套解决方案快一点。大约 90 个 observables 的嵌套解决方案 -> 50% cpu
      • 这个解决方案有 90 个 observables -> 40% cpu
      • 第二个实现与第一个基于相同的概念,只是它更“实用”或 F#ish。
      【解决方案3】:

      很抱歉,这不是 F# - 我希望我有时间学习它 - 但这是 C# 中的一个可能答案。

      这里有一组扩展方法,它们将结合最新的从 IEnumerable&lt;IObservable&lt;T&gt;&gt;IObservable&lt;IEnumerable&lt;T&gt;&gt;

      public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IObservable<T> second)
      {
          if (first == null) throw new ArgumentNullException("first");
          if (second == null) throw new ArgumentNullException("second");
          return first.CombineLatest(second, (t0, t1) => EnumerableEx.Return(t0).Concat(EnumerableEx.Return(t1)));
      }
      
      public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IObservable<T> second)
      {
          if (firsts == null) throw new ArgumentNullException("firsts");
          if (second == null) throw new ArgumentNullException("second");
          return firsts.CombineLatest(second, (t0s, t1) => t0s.Concat(EnumerableEx.Return(t1)));
      }
      
      public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources)
      {
          if (sources == null) throw new ArgumentNullException("sources");
          return sources.CombineLatest(() => sources.First().CombineLatest(sources.Skip(1)), () => Observable.Empty<IEnumerable<T>>());
      }
      
      public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IEnumerable<IObservable<T>> seconds)
      {
          if (first == null) throw new ArgumentNullException("first");
          if (seconds == null) throw new ArgumentNullException("seconds");
          return seconds.CombineLatest(() => first.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => first.Select(t => EnumerableEx.Return(t)));
      }
      
      public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IEnumerable<IObservable<T>> seconds)
      {
          if (firsts == null) throw new ArgumentNullException("firsts");
          if (seconds == null) throw new ArgumentNullException("seconds");
          return seconds.CombineLatest(() => firsts.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => firsts);
      }
      
      private static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources, Func<IObservable<IEnumerable<T>>> any, Func<IObservable<IEnumerable<T>>> none)
      {
          if (sources == null) throw new ArgumentNullException("sources");
          if (any == null) throw new ArgumentNullException("any");
          if (none == null) throw new ArgumentNullException("none");
          return Observable.Defer(() => sources.Any() ? any() : none());
      }
      

      它们可能效率不高,但它们确实可以处理任意数量的需要组合的可观察对象。

      我很想看到这些方法转换为 F#。

      至于你的第二个问题,我不确定我能不能回答你到目前为止所说的,因为CombineLatestThrottle 都失去了价值,所以之前更详细地了解你的用例可能是谨慎的尝试回答。

      【讨论】:

      • 真的很喜欢这个答案。我认为就 Rx 运算符而言可以 100% 完成,但我无法使用 Ix.Aggregate。我要为我的项目偷这个。非常感谢!这是我的白鲸。
      【解决方案4】:

      对于 1,List.foldList.toArray 以及一些 Observable 运算符的应用程序应该可以很好地工作。比如:

      let combineLatest observables =
          Observable.Select(
              (observables 
               |> List.fold (fun ol o 
                               -> Observable.CombineLatest(o, ol, (fun t tl -> t :: tl))
                            ) (Observable.Return<_>([]))
              ), 
              List.toArray)
      

      由于嵌套,如果您有大量的 Observables,您可能最终会遇到性能问题,但至少在您求助于手写之前尝试一下是值得的。

      对于 2,我同意将 Throttling 应用于结果的其他答案。

      【讨论】:

      • 不错的答案。不要忘记反转列表。 List.rev &gt;&gt; List.toArray
      • 确实这很好用,非常优雅和令人印象深刻(我需要消化它!;-))我会牢记性能建议。
      【解决方案5】:
      1. 似乎Observable.Merge() 具有可变数量IObservables 的重载更接近您想要的。

      2. Observable.Buffer() 与时间重载将在这里做你想要的。在“没有事件”的情况下,Buffer 仍然会 OnNext() 一个空列表,让您对这种情况做出反应。

      【讨论】:

      • Merge 会是他想要的吗?他在上面的示例中使用CombineLatest。这适用于您希望将每个 observable 的最新值聚合为另一个值的情况。示例将是 CanDoX 可观察对象的集合,您希望在其中拥有聚合的可观察对象 CombineLatest 并执行 latestValues =&gt; latestValues.All(x =&gt; x) 以查看是否所有对象都有效(如果有更改)。我有这个问题很多。也对您解决此问题的方法感兴趣。
      猜你喜欢
      • 1970-01-01
      • 2018-11-16
      • 1970-01-01
      • 1970-01-01
      • 2017-08-04
      • 1970-01-01
      • 2019-10-09
      • 2019-03-05
      • 1970-01-01
      相关资源
      最近更新 更多