【问题标题】:Maximize parallelism with Java 8 Streams [duplicate]使用 Java 8 Streams 最大化并行性 [重复]
【发布时间】:2017-04-27 10:25:29
【问题描述】:

Java Streams 的并行度取决于您的硬件。但是,如果我想始终拥有最大的并行度怎么办?

考虑下面的代码。我希望这 10 个任务中的每一个都同时运行 100 毫秒。

long runUntil = System.currentTimeMillis() + 100;
IntStream.range(0, 10).parallel().forEach(i ->
{
    int cnt = 0;
    while(System.currentTimeMillis() < runUntil)
        cnt++;
    System.out.println(i + ": " + cnt);
});

但是,我得到的结果是:

2: 56443
1: 67506
4: 74693
6: 70549
0: 0
3: 0
5: 0
7: 0
8: 0
9: 0

所以只有 4 个任务并行执行,而第 5 个任务只有在前 4 个任务之一完成时才开始。我希望所有任务大约在同一时间开始,而不是互相等待。

我不同意它是 Custom thread pool in Java 8 parallel stream 的副本,因为这个问题是关于运行缓慢的任务会阻塞其他任务,而就我而言,我只想知道如何(如果可以的话)最大化并行性使用 Stream API 时。

【问题讨论】:

  • 预期结果是什么?
  • 看起来你有 4 个核心。
  • '同时运行 100 毫秒':您希望所有任务 a) 同时开始,还是 b) 每个任务在运行 100 毫秒后才完成?
  • @wvdz 如果您要采用的解决方案是 java.util.concurrent.ForkJoinPool.common.parallelism = 10,那可能是 非常错误的。 你正在做的是为整个应用程序启用并行性,这不是很好。每次您要使用并行流时,将使用 forkjoinpool(在您无法控制的地方)将使用 10 个线程。您可能想分享您想要实现的确切目标。
  • @StoyanDekov 我不同意它是完全重复的。用例不同。我真的很想同时执行所有事情。对链接问题的最高投票答案不是我的问题的解决方案。

标签: java java-stream


【解决方案1】:

当您执行并行流时,您在后台调用 ForkJoinPool,该池的工作线程数等于以下结果:

 Runtime.getRuntime().availableProcessors(); // 4 in your case

所以并行任务由 4 个线程并发执行。

到你开始第 5 个任务的时候(已经过了 100 毫秒),所以这个条件:

  while(System.currentTimeMillis() < runUntil)

报告错误,因此仅为零。

要解决此问题,您可以自己创建一个 ForkJoinPool,如本答案中所述 (https://stackoverflow.com/a/22269778/2947592)

long runUntil = System.currentTimeMillis() + 1000;
ForkJoinPool forkJoinPool = new ForkJoinPool(10); // 10 Threads
forkJoinPool.submit(() ->
IntStream.range(0, 10).parallel().forEach(i -> {
    int cnt = 0;
    while (System.currentTimeMillis() < runUntil)
        cnt++;
    System.out.println(i + ": " + cnt);
})).get();

【讨论】:

  • @wvdz 我已经看到了您提供的答案,是的,修复了您的问题,但是映射更多线程而不是您拥有的内核并不是一个好主意。您可以解释一下您的用例吗?我真的很感兴趣。如果您只想测量每个核心添加的速度有多快,那么为 4 个核心分配 10 个线程只会让您的结果变得更糟。
  • 这可能不是一个好主意,但这是我工作的必要条件。另一种解决方案是手动启动线程,但我宁愿使用流 API。
  • @wvdz 在生产环境中,您需要手动触发线程,以便人们知道发生了什么。你可以mapToObject 并在那里创建线程。
  • 如果这是真的,那么这应该是答案。
  • @wvdz 如果这是您想要的答案,那么应该将其作为副本关闭。
【解决方案2】:

所以我已经找到了我自己问题的答案。问题是它真的感觉像一个黑客,而不是一个正确的解决方案。我不会在生产环境中使用它。

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

现在我会得到这样的结果,当我让它运行 1000 毫秒时:

9: 40158551
8: 41835052
0: 39087202
4: 37993773
6: 37993442
7: 36503041
2: 40076207
1: 37894657
5: 35785211
3: 40086037

我认为我的要求是合理的,但很惊讶它显然不受流 API 的支持。

【讨论】:

猜你喜欢
  • 2020-07-03
  • 1970-01-01
  • 2018-05-17
  • 2023-03-21
  • 2014-11-05
  • 2015-02-18
  • 1970-01-01
  • 2019-04-29
  • 2017-08-25
相关资源
最近更新 更多