【问题标题】:Can I benefit from a reactive library in this scenario?在这种情况下,我可以从反应式库中受益吗?
【发布时间】:2014-05-28 01:14:52
【问题描述】:

App 1 向 App 2 发送请求。App 2 执行以下步骤并将响应返回给 App 1。我想知道 App 2 是否可以从使用 RxJava、Reactor 等反应式库中受益。如果可以,请解释一下如何?

  1. 向所有 7 个数据源服务发送 HTTP Post 请求
  2. 等待他们的响应并解析所有响应
  3. 合并所有响应
  4. 向应用 1 返回响应

【问题讨论】:

  • 我知道:您想在这里使用reactive 解决方案,但我想添加一些噪音。在 EIP 中,它调用split-aggregate,Spring Integration 提供了开箱即用的解决方案。在这种情况下,拆分器可以将项目发送到ThreadPoolExecutor,或者,如果您愿意,可以发送到 Reactor 或仅发送到 RingBuffer。每个data source service 应该将其结果发送到aggregator,最后一个只是发送合并响应。
  • @ArtemBilan 我熟悉 EIP,并且多年来一直在使用骆驼和 Spring 集成。但我的问题具体是关于在这种特定情况下被动响应的好处。

标签: java reactor rx-java


【解决方案1】:

这是您可能发现的反应式库的经典用例! :)

“反应式”架构的关键部分是它们可以响应事件而不是等待结果。 RxJava 通过Observable 促进了这一点,而 Reactor 通过几种不同的机制实现了这一点。在 Reactor 中,您可以使用普通的Reactor 并在Event 上设置replyTo,您可以使用StreamPromise 来组成一个处理链,就像RxJava 的Observable,您可以使用 Processor 进行高速 RingBuffer 处理,或者您可以使用 ForkJoinPool 进行简单的 fork/join 样式处理。当然,有很多选项,但每个选项都旨在在特定用例中工作,而不会影响其他用例。 Reactor 框架不是单一的可调节扳手。这是一套尺寸完全符合您需要的扳手。

在这种特殊情况下,重要的部分是通过并发完成大量工作来实现并行性,因为您的数据源 IO 可能会阻塞线程。 RxJava 有一个可插入的执行模型,但它是相当粗粒度的。 Reactor 的优势之一是高效、细粒度的任务调度支持以及您可以轻松地对结果做出反应。

由于您的用例非常简单并且更接近标准的ThreadPoolExecutor 情况,我可能会想使用the ForkJoinPool in Reactor 1.1(这是全新的)。 ForkJoinPool 为您提供了一个 Promise<ImmutableList<T>>,它连接了您提交到池的所有任务的结果,该池由标准 ThreadPoolExecutor 支持。它基本上是围绕标准线程池的“反应式”包装器,因此需要很少的开销,但提供了对所有提交任务的完成做出反应的灵活性。类似于 RxJava 的Observable.merge()

【讨论】:

  • +1。如果我想在 7 个数据源中的一些失败的情况下继续怎么办?对于这个场景,你建议哪一个:RxJava 还是 Reactor?我想将线程管理委托给 Spring 的 TaskExecutor 抽象。在 RxJava 中可以吗?
  • @Pangea 听起来你可能需要fork 方法而不是join,它给你一个Stream,你可以在上面设置一个when() 错误处理程序和一个“正常”Consumer 获取结果。您必须自己跟踪会计,以了解您获得了多少结果与您预期的结果。 Reactor 的 Spring 支持还有一个 AsyncListenableTaskExecutor 实现,它使用您配置的 Reactor Dispatcher。这对于大容量可能是一个更好的选择,因为“普通”线程池在高负载时效率很低。
猜你喜欢
  • 1970-01-01
  • 2011-04-08
  • 2022-06-14
  • 1970-01-01
  • 2012-03-28
  • 2016-12-16
  • 2011-04-20
  • 2020-04-18
  • 1970-01-01
相关资源
最近更新 更多