【问题标题】:How to run Parallel tasks inside parallel tasks如何在并行任务中运行并行任务
【发布时间】:2015-08-23 19:16:19
【问题描述】:

我们可以在执行器服务中编写线程池执行器服务吗? 谁能建议如何在并行任务中运行并行任务?

假设有 10 个任务需要并行运行,在每个任务中我必须运行 100 个并行任务。大家有什么建议

ExecutorService executor1 = Executors.newFixedThreadPool(8);
for (int i = 0; i < 8; i++) {
    ExecutorService executor2 = Executors.newFixedThreadPool(115);
    for (int j = 0; j < 115; j++) {
        Runnable worker = new UpdatecheckerTest(Region.getRegion(Regions.US_EAST_1),"");
        executor2.execute(worker);
      }
  }
executor1.shutdown();

这是正确的方法吗?

【问题讨论】:

  • 你试过在另一个里面写一个线程池执行器吗?它抛出错误了吗?没用?如果是这样,请告诉我们错误...
  • 是设计问题吗?您在寻找设计建议或代码问题吗?
  • 不是问题..我正在寻找解决问题
  • 这与拥有 1000 个线程的 executor 有什么不同?因为任务是并行的
  • 是的,你可以。但是既然你不是在等待内部线程,为什么不直接重用外部执行器呢?

标签: java multithreading threadpool threadpoolexecutor


【解决方案1】:

这种方法可行,但我认为正确的解决方案取决于您未提及的其他一些事情。

简单案例

如果您要解决的问题非常简单、简短,在您的整个系统中不是很大的一部分,并且性能或稳定性也不是什么大问题。我什至根本不会使用线程池,只需使用parallel streams

您的代码可能如下所示:

IntStream.range(0,8).().forEach(i -> {
    IntStream.range(0,115).parallel().forEach(j -> {
        new UpdatecheckerTest(Region.getRegion(Regions.US_EAST_1),"").run();
    });
});

整个系统的主要部分

如果您尝试解决的问题确实是您系统的主要部分,那么当我查看您所描述的内容时,我实际上看到了一个代表外循环(i 循环)内部发生的事情的大任务,并且一个小任务,代表内部循环(j 循环)内发生的事情。如果这些任务在您的系统中占据主要角色,您可能希望将这些任务放在它们自己的类中,以使它们更具可读性、可重用性和以后更容易更改。 您的代码可能看起来像这样:

SmallTask​​.java

import java.text.MessageFormat;

public class SmallTask implements Runnable {
    private String identifier;

    public SmallTask (String identifier) {
        this.identifier = identifier;
    }

    @Override
    public void run() {
        System.out.println(String.format(MessageFormat.format("Executing SmallTask with id: {0}", identifier)));
        // what ever happens in new UpdatecheckerTest(Region.getRegion(Regions.US_EAST_1),"").run()
    }
}

LargeTask.java

import java.text.MessageFormat;
import java.util.stream.IntStream;

public class LargeTask implements Runnable {
    private String identifier;

    public LargeTask (String identifier) {
        this.identifier = identifier;
    }

    @Override
    public void run() {
        System.out.println(String.format(MessageFormat.format("Executing LargeTask with id: {0}", identifier)));
        IntStream.range(0, 115).parallel().forEach(j -> {
            new SmallTask(identifier + "-" + String.valueOf(j)).run();
        });
    }
}

Main.java

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) {
        IntStream.range(0,8).parallel().forEach(i -> {
            new LargeTask(String.valueOf(i)).run();
        });
    }
}

我什至会更进一步,说大型任务或启动它的原因可能是event driven architecture 中的一个事件,您可以将您的系统组织为具有不同类型的事件,所有这些事件都可以异步执行。

性能和稳定性很重要

如果此代码在您的系统中运行非常频繁,那么我会考虑使用线程轮询,它允许您控制正在使用的线程数以及分配给运行 LargeTask 的线程是否与分配给运行 SmallTask​​ 的线程相同.

那么您的代码可能如下所示:

SmallTask​​.java

import java.text.MessageFormat;

public class SmallTask implements Runnable {
    private String identifier;

    public SmallTask (String identifier) {
        this.identifier = identifier;
    }

    @Override
    public void run() {
        System.out.println(String.format(MessageFormat.format("Executing SmallTask with id: {0}", identifier)));
        // what ever happens in new UpdatecheckerTest(Region.getRegion(Regions.US_EAST_1),"").run()
    }
}

LargeTask.java

import java.text.MessageFormat;
import java.util.stream.IntStream;

public class LargeTask implements Runnable {
    private String identifier;

    public LargeTask (String identifier) {
        this.identifier = identifier;
    }

    @Override
    public void run() {
        System.out.println(String.format(MessageFormat.format("Executing LargeTask with id: {0}", identifier)));
        IntStream.range(0, 115).forEach(j -> {
            TasksExecutor.getSmallTaskExecutor().execute(new SmallTask(identifier + "-" + String.valueOf(j)));
        });
    }
}

TasksExecutor.java

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TasksExecutor {
    private static ExecutorService largeTasksExecutor = Executors.newFixedThreadPool(8);
    private static ExecutorService smallTaskExecutor = Executors.newFixedThreadPool(115);

    public static ExecutorService getLargeTaskExecutor () {
        return largeTasksExecutor;
    }

    public static ExecutorService getSmallTaskExecutor () {
        return smallTaskExecutor;
    }
}

Main.java

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) {
        IntStream.range(0,8).forEach(i -> {
            TasksExecutor.getLargeTaskExecutor().execute(new LargeTask(String.valueOf(i)));
        });
    }
}

如果需要,不要忘记添加关闭线程池的功能。 并且可能在每个任务和您想要管理它的特定线程池之间添加某种依赖注入,它将在以后为您提供更好的灵活性

如果您想更进一步,您可以改用消息传递框架,您可以在其中使用不同的队列来管理所有需要执行的任务。比如ZeroMQKafka

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-11-03
    • 2021-12-02
    • 1970-01-01
    • 1970-01-01
    • 2017-09-26
    • 1970-01-01
    相关资源
    最近更新 更多