【发布时间】:2011-10-29 21:59:31
【问题描述】:
您好,我正在尝试将多个可观察对象合并到可观察数组中。这是一个适用于 fsi 的示例。 (抱歉,篇幅较长)
#r "./bin/Debug/System.Reactive.dll"
open System
open System.Reactive.Linq
/// Subscribes to the Observable with all 3 callbacks.
let subscribeComplete next error completed (observable: IObservable<'T>) =
observable.Subscribe(
(fun x -> next x),
(fun e -> error e),
(fun () -> completed()))
/// Subscribes to the Observable with a next and an error-function.
let subscribeWithError next error observable =
subscribeComplete next error (fun () -> ()) observable
/// Subscribes to the Observable with a next-function
let subscribe (next: 'T -> unit) (observable: IObservable<'T>) : IDisposable =
subscribeWithError next ignore observable
/// Static method to generate observable from input functions
let ObsGenerate (initState: 'TS) (termCond: 'TS -> bool) (iterStep: 'TS -> 'TS)
(resSelect: 'TS -> 'TR) (timeSelect : 'TS -> System.TimeSpan) =
Observable.Generate(initState, termCond, iterStep, resSelect, timeSelect)
//maps the given observable with the given function
let obsMap (f: 'T -> 'U) (observable : IObservable<'T>) : IObservable<'U> =
Observable.Select(observable, Func<_,_>(f))
/// Merges two observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest (obs1: IObservable<'T>) (obs2: IObservable<'U>) : IObservable<'T * 'U> =
Observable.CombineLatest(
obs1, obs2, Func<_,_,_>(fun a b -> a, b))
/// Merges three observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest3 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) : IObservable<'T * 'U * 'V> =
let obs12 =obs1.CombineLatest(obs2, Func<_,_,_>(fun a b -> a, b))
obs12.CombineLatest(obs3, Func<_,_,_>(fun (a,b) c -> a, b, c))
/// Merges four observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest4 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) (obs4: IObservable<'W>) : IObservable<'T * 'U * 'V * 'W> =
let obsNew = combineLatest3 obs1 obs2 obs3
obsNew.CombineLatest(obs4, Func<_,_,_>(fun (a,b,c) d -> a, b, c, d))
// second section generating arrays
let combineLatestArray (obs1: IObservable<'T>) (obs2: IObservable<'T>) =
combineLatest obs1 obs2
|> obsMap (fun (a, b) -> [a; b] |> List.toArray)
let combineLatest3Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) =
combineLatest3 obs1 obs2 obs3
|> obsMap (fun (a, b, c) -> [a; b; c] |> List.toArray)
let combineLatest4Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) (obs4: IObservable<'T>) =
combineLatest4 obs1 obs2 obs3 obs4
|> obsMap (fun (a, b, c, d) -> [a; b; c; d] |> List.toArray)
let combineLatestListToArray (list: IObservable<'T> List) =
match list.Length with
| 2 -> combineLatestArray list.[0] list.[1]
| 3 -> combineLatest3Array list.[0] list.[1] list.[2]
| 4 -> combineLatest4Array list.[0] list.[1] list.[2] list.[3]
| _ -> failwith "combine latest on unsupported list size"
type FooType =
{ NameVal : string
IdVal : int
RetVal : float }
member x.StringKey() =
x.NameVal.ToString() + ";" + x.IdVal.ToString()
// example code starts here
let rnd = System.Random()
let fooListeners = Collections.Generic.Dictionary()
let AddAFoo (foo : FooType) =
let fooId = foo.StringKey()
if fooListeners.ContainsKey(fooId)
then fooListeners.[fooId]
else
let myObs = ObsGenerate {NameVal = foo.NameVal; IdVal = foo.IdVal; RetVal = foo.RetVal} (fun x -> true) (fun x -> {NameVal = (x.NameVal); IdVal = (x.IdVal); RetVal = (x.RetVal + rnd.NextDouble() - 0.5)}) (fun x -> x) (fun x -> System.TimeSpan.FromMilliseconds(rnd.NextDouble() * 2000.0))
fooListeners.Add(fooId,myObs)
myObs
let fooInit = [6..9]
|> List.map (fun index -> {NameVal = (string index + "st"); IdVal = index; RetVal = (float index + 1.0)})
|> List.map (fun foo -> AddAFoo foo)
let fooValuesArray = fooInit
|> List.map(fun x -> (x |> obsMap (fun x -> x.RetVal)))
|> combineLatestListToArray
let mySub =
fooValuesArray
|> subscribe (fun fooVals -> printfn "fooArray: %A" fooVals)
//execute until here to start example
// execute this last line to unsubscribe
mySub.Dispose()
我对这段代码有两个问题:
有没有更聪明的方法可以将 observables 合并到数组中? (因为我需要合并更大的数组,它变得非常冗长)
我想限制更新。我的意思是我希望在(比如说)相同的半秒窗口内发生的所有更新都作为数组上的一次更新来处理。理想情况下,我希望此窗口仅在第一次更新到来时打开,即如果 2 秒内没有更新到达,则有一个更新到达,然后我们等待并包含进一步的更新 0.5 秒,然后触发 observable。我不希望它每 0.5 秒定期发布一次,尽管没有触发任何可观察的。我希望这个描述足够清楚。
更新:我已决定接受 F# 答案之一,但我还没有完成 C# 答案的正义。我希望能够尽快正确地检查它们。
【问题讨论】:
标签: f# system.reactive