【问题标题】:How can a threadpool be reused after shutdown关闭后如何重用线程池
【发布时间】:2011-12-05 01:43:12
【问题描述】:

我有一个 .csv 文件,其中包含超过 7000 万行,其中每一行用于生成 Runnable,然后由线程池执行。这个 Runnable 会在 Mysql 中插入一条记录。

更重要的是,我想记录 csv 文件的位置,以便 RandomAccessFile 定位。该位置被写入一个文件。我想在线程池中的所有线程都完成后写入此记录。因此调用了ThreadPoolExecutor.shutdown()。但是当更多行出现时,我又需要一个线程池。我怎样才能重用这个当前的线程池而不是创建一个新的。

代码如下:

public static boolean processPage() throws Exception {

    long pos = getPosition();
    long start = System.currentTimeMillis();
    raf.seek(pos);
    if(pos==0)
        raf.readLine();
    for (int i = 0; i < PAGESIZE; i++) {
        String lineStr = raf.readLine();
        if (lineStr == null)
            return false;
        String[] line = lineStr.split(",");
        final ExperienceLogDO log = CsvExperienceLog.generateLog(line);
        //System.out.println("userId: "+log.getUserId()%512);

        pool.execute(new Runnable(){
            public void run(){
                try {
                    experienceService.insertExperienceLog(log);
                } catch (BaseException e) {
                    e.printStackTrace();
                }
            }
        });

        long end = System.currentTimeMillis();
    }

    BufferedWriter resultWriter = new BufferedWriter(
            new OutputStreamWriter(new FileOutputStream(new File(
                    RESULT_FILENAME), true)));
    resultWriter.write("\n");
    resultWriter.write(String.valueOf(raf.getFilePointer()));
    resultWriter.close();
    long time = System.currentTimeMillis()-start;
    System.out.println(time);
    return true;
}

谢谢!

【问题讨论】:

    标签: java concurrency


    【解决方案1】:

    documentation 中所述,您不能重复使用已关闭的ExecutorService。我建议不要使用任何解决方法,因为 (a) 它们可能无法在所有情况下都按预期工作; (b) 你可以使用标准类实现你想要的。

    你必须要么

    1. 实例化一个新的ExecutorService;或

    2. 不终止ExecutorService

    第一种方案很容易实现,我就不细说了。

    第二个,因为你想在所有提交的任务完成后执行一个动作,你可以看看ExecutorCompletionService并改用它。它包装了一个ExecutorService,它将执行线程管理,但可运行对象将被包装成一些东西,当它们完成时会告诉ExecutorCompletionService,因此它可以向您报告:

    ExecutorService executor = ...;
    ExecutorCompletionService ecs = new ExecutorCompletionService(executor);
    
    for (int i = 0; i < totalTasks; i++) {
      ... ecs.submit(...); ...
    }
    
    for (int i = 0; i < totalTasks; i++) {
      ecs.take();
    }
    

    ExecutorCompletionService 类上的方法 take() 将阻塞,直到任务完成(正常或突然)。它将返回一个Future,因此您可以根据需要检查结果。

    我希望这可以帮助你,因为我没有完全理解你的问题。

    【讨论】:

    • 如何强制停止线程或取消所有任务?使用 exectuer.shutdownNow() ?即使使用这个包装器也能正常工作吗?
    【解决方案2】:

    在 ExecutorService 上调用 shutdown 后,no new Task will be accepted。这意味着您必须为每一轮任务创建一个新的 ExecutorService。

    但是,Java 8 ForkJoinPool.awaitQuiescence 被引入。如果您可以从普通的 ExecutorService 切换到 ForkJoinPool,则可以使用此方法等待 ForkJoinPool 中没有更多任务在运行,而无需调用关闭。这意味着您可以用 Tasks 填充 ForkJoinPool,等到它为空(静止),然后再次开始用 Tasks 填充它,等等。

    【讨论】:

      【解决方案3】:

      创建和分组所有任务并使用invokeAll 将它们提交到池中(仅在所有任务成功完成时返回)

      【讨论】:

      • 使用ExecutorCompletionServiceinvokeAll 的更好方法是什么?
      猜你喜欢
      • 2014-03-10
      • 2011-04-28
      • 1970-01-01
      • 1970-01-01
      • 2010-09-13
      • 2018-10-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多