【问题标题】:Parallellize a for loop in Java using multi-threading使用多线程并行化 Java 中的 for 循环
【发布时间】:2017-09-25 03:07:05
【问题描述】:

我对 java 很陌生,我想使用执行器服务或使用 java 中的任何其他方法并行化嵌套的 for 循环。我想创建一些固定数量的线程,这样 CPU 就不会完全被线程占用。

    for(SellerNames sellerNames : sellerDataList) {
        for(String selleName : sellerNames) {
        //getSellerAddress(sellerName)
        //parallize this task
        }
    }

sellerDataList 的大小 = 1000,sellerNames 的大小 = 5000。

现在我想创建 10 个线程,并为每个线程平均分配相等的任务块。那是针对第 i 个 SellerDataList,第一个线程应该获取 500 个名称的地址,第二个线程应该获取下一个 500 个名称的地址,依此类推。
完成这项工作的最佳方法是什么?

【问题讨论】:

  • 您可以使用 FixedSizeThreadPool 并为每个卖家名称提交一个任务。请参阅docs.oracle.com/javase/8/docs/api/java/util/concurrent/… 当您提交所有任务时,在 Executor 上发出 shutdown ,然后等待它完成 awaitTermination
  • 也许Collections .stream().parallel() 表单是一个选项?
  • 我已经尝试过 Collections .stream().parallel()。它受核心数量的限制。
  • 语法错误。你写foreach的地方应该是for
  • @Jhutan Debnath,“受 [the] 核心数量的限制”实际上并没有太大的限制。 “parallize”应拼写为“parallelize”。

标签: java multithreading parallel-processing threadpool


【解决方案1】:

有两种方法可以让它并行运行:Streams 和 Executors。

使用流

您可以使用并行流并将其余部分留给 jvm。在这种情况下,您对何时发生的事情没有太多控制权。另一方面,您的代码将易于阅读和维护:

    sellerDataList.stream().forEach(sellerNames -> {
        Stream<String> stream = StreamSupport.stream(sellerNames.spliterator(), true); // true means use parallel stream
        stream.forEach(sellerName -> {
            getSellerAddress(sellerName);
        });
    });

使用 ExecutorService

假设,您需要 5 个线程,并且您希望能够等到任务完成。然后你可以使用一个有 5 个线程的固定线程池并使用Future-s 这样你就可以等到它们完成了。

    final ExecutorService executor = Executors.newFixedThreadPool(5); // it's just an arbitrary number
    final List<Future<?>> futures = new ArrayList<>();
    for (SellerNames sellerNames : sellerDataList) {
        for (final String sellerName : sellerNames) {
            Future<?> future = executor.submit(() -> {
                getSellerAddress(sellerName);
            });
            futures.add(future);
        }
    }
    try {
        for (Future<?> future : futures) {
            future.get(); // do anything you need, e.g. isDone(), ...
        }
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

【讨论】:

  • 现在在 Java 8 中你可以做到sellerNames.parallelStream().forEach(...)
【解决方案2】:

如果您使用并行流,您仍然可以通过创建自己的 ForkJoinPool 来控制线程。

List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
  .collect(Collectors.toList());

ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
  () -> aList.parallelStream().reduce(0L, Long::sum)).get();

在这个网站上,它的描述非常好。 https://www.baeldung.com/java-8-parallel-streams-custom-threadpool

【讨论】:

    猜你喜欢
    • 2019-03-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-02-13
    • 2012-03-28
    相关资源
    最近更新 更多