【问题标题】:Aggregate large number of observables into new observable将大量可观察对象聚合成新的可观察对象
【发布时间】:2010-12-06 10:05:53
【问题描述】:

比方说,我有 1000 个 observables。现在我想将所有事件聚合到一个新的 observable 中,一旦所有其他事件都发送了一个事件,它就会触发 OnNext。使用 Rx 的最佳方法是什么?

更新: Rx 论坛上有一些很好的反馈,尤其是 Dave Sexton。他展示了如何创建一个采用多个可观察对象的 Zip 扩展方法:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/daaa84db-b560-4eda-871e-e523098db20c/

【问题讨论】:

  • 这1000个observables的类型都是一样的吗?你认为聚合的 observable 是什么类型的?
  • 1000 个 observables 都是同一个类型,新的聚合可以是一个新的类型。例如。事件变为 AggregateEvent。
  • 您只想合并它们的最新值吗? IE。如果 Observable a 触发了两个事件,而 Observable b 只触发了一个,你想将 a 的第一个事件或 a 的最后一个事件与 b 的事件聚合吗?
  • @Richard Hein 让我们从 a 中的最后一个事件开始,因为我们可以假设它们按顺序到达,即所有 1000 个事件在一些可观察对象触发新事件之前到达。但是,将它们配对当然会更好,这样第一个 a 事件将与第一个 b 事件一起到达。
  • 我想你最好在 Rx 论坛上问这个问题。

标签: c# .net system.reactive


【解决方案1】:

F# 中有一个 MailboxProcessor...我会在 C# 中使用 SynchronizationContext 来达到同样的目的。给我几分钟,我会写一个例子。

顺便说一句:这是我在 F# 中的代码,它做了类似的事情......这将花费更多的精力,但在带有 Rx 的 C# 中仍然可行。

open System.Diagnostics

let numWorkers = 20
let asyncDelay = 100

type MessageForMailbox =
   | DataMessage of AsyncReplyChannel<unit>
   | GetSummary of AsyncReplyChannel<unit>

let main =
   let actor =
      MailboxProcessor.Start( fun inbox ->
         let rec loop acc =
            async {
               let! message = inbox.Receive()
               match message with
               | DataMessage replyChannel -> replyChannel.Reply(); return! loop acc
               | GetSummary replyChannel -> replyChannel.Reply(); return! loop acc
            }

         loop 0 // seed for acc
      )

   let codeBlocks = [for i in 1..numWorkers -> 
                        async {
                           do! Async.Sleep asyncDelay
                           return! actor.PostAndAsyncReply DataMessage
                        } ]

   while true do
      printfn "Concurrent started..."
      let sw = new Stopwatch()
      sw.Start()
      codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
      actor.PostAndReply GetSummary
      sw.Stop()
      printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * 100) / sw.ElapsedMilliseconds)

      printfn "Synchronous started..."
      let sw = new Stopwatch()
      sw.Start()
      for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100) / sw.ElapsedMilliseconds)

main

【讨论】:

  • 嗯,所以您的意思是使用 SynchronizationContext.Send() 来同步所有创建事件的可观察对象?我有点明白你的 F# 代码做了什么,但我不够精明,无法完全理解它。
  • 我想你明白了。 RunSynchronously 使用异步工作流实现 ForkJoin。
  • +1:我以前从未见过 MailboxProcessor 的好例子。 :)
猜你喜欢
  • 1970-01-01
  • 2019-09-09
  • 1970-01-01
  • 1970-01-01
  • 2016-12-11
  • 1970-01-01
  • 2019-01-30
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多