【问题标题】:How to perform multi-threaded map/reduce using Reactor framework 2.x?如何使用 Reactor 框架 2.x 执行多线程 map/reduce?
【发布时间】:2015-01-15 19:18:05
【问题描述】:

我之前向 this question 询问过 Reactor 1.x:

假设我有一个Collection<Map>。我想:

同时将每个 Map 实例转换为 Foo 类型的对象(每个实例完全独立于另一个 - 无需串行/迭代地转换每个实例)。

当所有这些都被转换后,我想要一个方法 onReduce(Collection<Foo> foos) 被调用 - 参数包含所有生成的 Foo 实例。

但我们似乎无法为 Reactor 2.x 找到等效的解决方案 - 只是单线程。

如何在 Reactor 2.x 中执行多线程 map/reduce?例如,您如何使用基于 ExecutorService 的 Dispatcher 执行此操作?

【问题讨论】:

  • 只是好奇,您没有为此使用 ForkJoin 框架有什么特别的原因吗?
  • @JohnVint Reactor 为除了 map/reduce aka fork/join 之外的各种流式传输行为提供了相当全面的 API。我们将在我们的应用程序的各个地方利用这些其他东西,因此它有助于在具有共享分类的通用框架上“标准化”,从而使整个代码库更具凝聚力/可维护性。

标签: java multithreading concurrency mapreduce reactor


【解决方案1】:

现在使用 Reactor 2.0 实际上很容易。你可以这样做:

List<Map<String, Object>> data = readData(); // <1>

Streams.from(data)
       .flatMap(m -> Streams.just(m)
                            .dispatchOn(Environment.cachedDispatcher()) // <2>
                            .map(ignored -> Thread.currentThread().getName()))
       .buffer() // <3>
       .consume(s -> System.out.println("s: " + s)); // <4>
  1. 根据输入数据创建Stream
  2. 为每个Map 创建一个新的Stream 并在给定的Dispatcher 上调度地图操作。
  3. 缓冲所有值直到完成,当集合清空时将向下游发送。
  4. 消费列表,它是来自子流的负载平衡转换的结果。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多