【问题标题】:How to create multiple threads inside of a for loop如何在for循环内创建多个线程
【发布时间】:2021-05-11 20:28:02
【问题描述】:

您好,我对 Java 有点陌生。

我有一个方法,它接收一个映射,并将映射中的每个键值对写入一个文件。

我希望映射中的每个键值对都有一个线程运行,以便我可以同时创建多个文件。不确定执行此操作的正确方法是什么或如何使用执行器服务来完成此操作。

这是我正在尝试的一个非常简单的示例。而不是编写用于编写文件的所有代码,我只是在示例中使用 system.out.println:

public class CityWriter
{

 public static void main(String []args)
 {
    LinkedHashMap<Integer, ArrayList<City>> stateNumCitiesMap = new LinkedHashMap<Integer, ArrayList<City>>();

    stateNumCitiesMap  = retrieveStateCitiesMap();

    int numOfThreadsToExecuteAtATime = 10;
    
    ExecutorService executor = Executors.newFixedThreadPool(numOfThreadsToExecuteAtATime);
    
    for(Integer key : stateNumCitiesMap.keySet()) //Could have up to 50 key,values in map
    {   
        executor.execute(writeCitiesOfStateToFile(key, StateNumCitiesMap.get(key)));
    }
    
    executor.shutdown();

 }

public LinkedHashMap<Integer, ArrayList<Cities>> writeCitiesOfStateToFile(int stateNum, List<City> citiesList) 
{
    for(City city : citiesList)
    {
        System.out.println(stateNum +" "+ city);
    }
}

}//end of class

我的问题是它似乎没有在这里并行执行线程。此外,我不想一次运行超过 10 个线程,即使 for 循环会调用 executor 50 次。

请告诉我什么是最有效的方法。

【问题讨论】:

  • 你要写的文件真的很大吗?因为如果它们是小文件,多线程可能没用。如果它们很大,您可以尝试使用两个或三个线程并写入文件。当一个文件被写入后,您将使用另一个可用线程转到下一个文件。如果你想要一个关于多线程的总结,看看Tutorialpoint
  • 这可以编译吗?静态方法中没有 this writeCitiesOfStateToFile 不是 execute () 的有效参数。
  • 是的,文件非常大,写入文件之前方法中的实际代码需要一段时间才能完成。
  • 抱歉没有检查是否编译我当场做了一个简单的例子
  • 嗨@daniu。我希望能够多线程这个方法它必须是 Runnable 并返回 runnable 还是有其他方法?

标签: java multithreading concurrency executorservice


【解决方案1】:

实际上,如果我很好地理解了您的问题,那么您的代码将完全符合您的要求(当然,如果我们在您的代码 sn-p 中省略所有语法错误):

  • 它生成的线程不会超过 10 个,因为您已在此处指定 Executors.newFixedThreadPool(10) 您想要多少线程
  • 您的所有 x 映射条目都将作为潜在工作分配给 executor。然后 executor 将与所有 10 个线程并行运行它们中的每一个(但一次不超过 10 个作业)

您可以尝试这个 sn-p 并检查多个线程是否在并行执行该工作:

    public static void main(String[] args) {
        Map<Integer, List<String>> stateNumCitiesMap = new LinkedHashMap<>();

        for (int i = 0; i < 100; i++) {
            stateNumCitiesMap.put(i, Collections.singletonList("ABC"));
        }

        ExecutorService executor = Executors.newFixedThreadPool(10);

        for (Integer key : stateNumCitiesMap.keySet()) {
            executor.execute(() -> writeCitiesOfStateToFile(key, stateNumCitiesMap.get(key)));
        }

        executor.shutdown();
    }

    public static void writeCitiesOfStateToFile(int stateNum, List<String> citiesList) {
        for (String city : citiesList) {
            System.out.println(stateNum + " " + Thread.currentThread().getName());
        }
    }

如果你不想一个作业一个作业交给执行者,可以一次性传一批。

 public static void main(String[] args) throws InterruptedException {
        Map<Integer, List<String>> stateNumCitiesMap = new LinkedHashMap<>();

        for (int i = 0; i < 100; i++) {
            stateNumCitiesMap.put(i, Collections.singletonList("ABC"));
        }

        ExecutorService executor = Executors.newFixedThreadPool(10);

        List<Callable<Void>> jobs = new ArrayList<>();
        for (Integer key : stateNumCitiesMap.keySet()) {
            jobs.add(() -> {
                writeCitiesOfStateToFile(key, stateNumCitiesMap.get(key));
                return null;
            });
        }
        executor.invokeAll(jobs);

        executor.shutdown();
    }

    public static void writeCitiesOfStateToFile(int stateNum, List<String> citiesList) {
        for (String city : citiesList) {
            System.out.println(stateNum + " " + Thread.currentThread().getName());
        }
    }

【讨论】:

    【解决方案2】:

    您可以使用“invokeAll”方法进行多次执行,甚至可以获取它们的结果(无论是否完成)。即使他们是 50,它也会为他们使用 10 个线程。当所有任务完成时,将返回结果。像下面这样的,把它当作伪。

    Callable<int> callableTask = (fileName) -> {
    // implement write to the file
            return 0;
        };
    ExecutorService executor = Executors.newFixedThreadPool(10);
         
    List<Callable<int>> tasksList;
    for(City city : citiesList)
    {
        tasksList.add(callableTask(city.toString()));
    }
    
    executor.invokeAll(tasksList);
    

    【讨论】:

      【解决方案3】:

      在 java 中,您需要为希望在线程中运行的任何对象提供可运行接口,您没有这样做,而这正是执行程序所期望的。

       executor.execute(() -> your function )
      

      其实是

       executor.execute(new Runnable() {
                      @Override
                      public void run() {
                          // your code 
                      }
                  });
      

      该方法没有实现runnables,只有在runnable的run方法中才会被线程化

      原因是executor使用了一种观察者模式,你订阅了runnable给它,executor然后运行run方法

      来自 java 文档:

      Runnable 接口应该由其实例打算由线程执行的任何类实现。该类必须定义一个没有参数的方法,称为run。 该接口旨在为希望在活动时执行代码的对象提供通用协议。例如,Runnable 是由类 Thread 实现的。处于活动状态仅意味着线程已启动且尚未停止。

      也可以让方法本身返回一个runnable

       public static Runnable writeCitiesOfStateToFile(params) {
      
              return  () -> System.out.println(params);
          }
      

      【讨论】:

        【解决方案4】:

        Executor#execute 可能是同步的

        你说:

        这里好像没有并行执行线程

        你没有解释这种看法的原因。

        但是,仅供参考,情况可能确实如此。你在ExecutorService 上调用了execute 方法。

            for(Integer key : stateNumCitiesMap.keySet()) //Could have up to 50 key,values in map
            {   
                executor.execute(writeCitiesOfStateToFile(key, StateNumCitiesMap.get(key)));
            }
        

        execute 方法继承自 Executor 接口,ExecutorService 的超接口。该接口及其方法被记录为可能异步运行您的任务。引用 Javadoc:

        根据 Executor 实现的判断,该命令可以在新线程、池线程或调用线程中执行。

        所以您可能确实看到的是顺序非线程同步执行而不是异步。

        根据我对ExecutorService 方法submitinvokeAllinvokeAny 的阅读,这些方法似乎有望始终异步运行。

        我不相信这种同步行为正在发生,因为您选择了ExecutorService 实现。您对Executors.newFixedThreadPool 的调用会生成ThreadPoolExecutor 类型的对象。 Looking briefly at the source code 的那个具体类的 execute 方法,它似乎总是异步工作(尽管我不完全确定)。

        尽管如此,我们似乎应该在使用Executor#execute时总是假设异步执行。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2012-03-28
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2022-07-07
          • 1970-01-01
          相关资源
          最近更新 更多