【问题标题】:Collect partial results from parallel streams从并行流中收集部分结果
【发布时间】:2015-08-06 22:39:44
【问题描述】:

Java8中,处理两个并行流中的项目对,如下所示:

final List<Item> items = getItemList();
final int l = items.size();
List<String> results = Collections.synchronizedList(new ArrayList<String>());
IntStream.range(0, l - 1).parallel().forEach(
    i -> {
        Item item1 = items.get(i);
        int x1 = item1.x;
        IntStream.range(i + 1, l).parallel()
            .forEach(j -> {
                Item item2 = items.get(j);
                int x2 = item2.x;
                if (x1 + x2 < 200) return;
                // code that writes to ConcurrentHashMap defined near results
                if (x1 + x2 > 500) results.add(i + " " + j);
            });
    }
);

每个流对都写入ConcurrentHashMap,并且根据某些条件,它可能会通过调用return; 来终止流执行,或者它可能会写入同步列表。

我想让流返回像return i + " " + j 这样的结果,并将这些结果收集到外面的列表字符串中。它应该是部分的,因为必须支持不返回任何内容(以防x1 + x2 &lt; 200)。

实现这一目标的最省时(最快的代码)方法是什么?

【问题讨论】:

  • 请提供正确的编译代码。您的 results 被声明为数组,但您像列表一样使用它。 ConcurrentHashMap 在哪里?你到底在写什么?另请提供示例输入和所需的输出。现在有点不清楚你想要实现什么。
  • 如果x1 + x2 &gt; 500,您只会添加到results。为什么不使用收集器?另外......据我所知,你不能从这样的 foreach 中返回。
  • 在您的特定示例中,并行执行可能不会产生更快的性能。查看此问题及其最佳答案:stackoverflow.com/questions/23170832/…
  • 在关心时间效率之前,我认为你应该关心正确性。 (1) 如果您需要在特定条件下终止流,则不能并行化,否则您无法控制执行顺序,并且可能有对添加到 results 列表中,这些对在逻辑上出现在触发您的停止条件的对之后 @ 987654333@。 (2) 像这样的return; 绝对不是停止流执行的方法。

标签: parallel-processing java-8 java-stream


【解决方案1】:

在这个答案中,我不会讨论时间效率,因为有正确性问题需要事先处理。

正如我在 cmets 中所说,如果我们并行化流,则不可能在特定条件后停止流执行。否则,可能有一些对 (i,j) 已经在执行,它们在数字上位于触发停止条件 x1 + x2 &lt; 200 的对之后。 另一个问题是 lambda 中的 return;,它所要做的就是跳过第二个 if 以获取 x1 + x2 &lt; 200 所在的 j,但流将继续使用 j+1

在 Java 中没有直接的方法来停止流,但我们可以使用 allMatch 来实现,因为我们可以预期,一旦它找到 false 值,它将短路并返回 @987654330 @正确的方式。

所以,这将是您的代码的正确版本:

IntStream.range(0, l - 1).allMatch(i -> {
    int x1 = items.get(i).x;
    return IntStream.range(i + 1, l).allMatch(j -> {
        int x2 = items.get(j).x;
        if (x1 + x2 < 200) {
            return false;
        } else {
            if (x1 + x2 > 500) results2.add(i + " " + j);
            return true;
        }
    });
});

对于以下示例,使用构造函数Item(int x, int y)

final List<Item> items = Arrays.asList(
        new Item(200, 0),
        new Item(100, 0),
        new Item(500, 0),
        new Item(400, 0),
        new Item(1, 0));

我的版本中results的内容是:

[0 2, 0 3, 1 2]

使用您的代码(每次执行的顺序和元素都不同):

[2 4, 2 3, 1 2, 0 3, 0 2]

【讨论】:

    【解决方案2】:

    我认为这会更有效(虽然没有进行任何微基准测试):

    IntStream.range(0,l-1).forEach(
        i -> IntStream.range(i+1,l)
                      .filter(j -> items.get(i).x + items.get(j).x > 500)
                      .forEach(j -> results.add(i + " " + j)));
    

    但是,如果我真的担心执行此操作所花费的时间,我会更加注意items 使用了什么样的List 实现。在进入 lambda 之前,甚至可以将列表转换为 HashMap&lt;Integer, Item&gt;。例如,如果 itemsLinkedList,则对 lambda 的任何改进都可能无关紧要,因为 items.get() 会一直吃光。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-03-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多