【问题标题】:How do we know threadPoolExecutor has finished execution我们如何知道 threadPoolExecutor 已经执行完毕
【发布时间】:2013-08-07 18:51:34
【问题描述】:

我有一个向 MQ 发送消息的父线程,它为工作线程管理一个 ThreadPoolExecutor,该线程侦听 MQ 并将消息写入输出文件。我管理一个大小为 5 的线程池。所以当我运行我的程序时,我有 5 个带有消息的文件。直到这里一切正常。我现在需要在我的父线程中合并这 5 个文件。

我如何知道 ThreadPoolExecutor 已完成处理,以便开始合并文件。

public class ParentThread {
    private MessageSender messageSender;
    private MessageReciever messageReciever;
    private Queue jmsQueue;
    private Queue jmsReplyQueue;
    ExecutorService exec = Executors.newFixedThreadPool(5);

    public void sendMessages() {
        System.out.println("Sending");
        File xmlFile = new File("c:/filename.txt");
        List<String> lines = null;
        try {
            lines = FileUtils.readLines(xmlFile, null);
        } catch (IOException e) {
            e.printStackTrace();
        }
        for (String line : lines){
            messageSender.sendMessage(line, this.jmsQueue, this.jmsReplyQueue);
        }
        int count = 0;
        while (count < 5) {
            messageSender.sendMessage("STOP", this.jmsQueue, this.jmsReplyQueue);
            count++;
        }

    }

    public void listenMessages() {
        long finishDate = new Date().getTime();
        for (int i = 0; i < 5; i++) {
            Worker worker = new Worker(i, this.messageReciever, this.jmsReplyQueue);
            exec.execute(worker);
        }
        exec.shutdown();

        if(exec.isTerminated()){ //PROBLEM is HERE. Control Never gets here.
            long currenttime = new Date().getTime() - finishDate;
            System.out.println("time taken: "+currenttime);
            mergeFiles();
        }
    }
}

这是我的工人阶级

public class Worker implements Runnable {

    private boolean stop = false;
    private MessageReciever messageReciever;
    private Queue jmsReplyQueue;
    private int processId;
    private int count = 0;

    private String message;
    private File outputFile;
    private FileWriter outputFileWriter;

    public Worker(int processId, MessageReciever messageReciever,
            Queue jmsReplyQueue) {
        this.processId = processId;
        this.messageReciever = messageReciever;
        this.jmsReplyQueue = jmsReplyQueue;
    }

    public void run() {
        openOutputFile();
        listenMessages();
    }


    private void listenMessages() {
        while (!stop) {
            String message = messageReciever.receiveMessage(null,this.jmsReplyQueue);
            count++;
            String s = "message: " + message + " Recieved by: "
                    + processId + " Total recieved: " + count;
            System.out.println(s);
            writeOutputFile(s);
            if (StringUtils.isNotEmpty(message) && message.equals("STOP")) {
                stop = true;
            }
        }
    }

    private void openOutputFile() {
        try {
            outputFile = new File("C:/mahi/Test", "file." + processId);
            outputFileWriter = new FileWriter(outputFile);
        } catch (IOException e) {
            System.out.println("Exception while opening file");
            stop = true;
        }
    }

    private void writeOutputFile(String message) {
        try {
            outputFileWriter.write(message);
            outputFileWriter.flush();
        } catch (IOException e) {
            System.out.println("Exception while writing to file");
            stop = true;
        }
    }
}

我如何知道 ThreadPool 何时完成处理以便我可以进行其他清理工作?

谢谢

【问题讨论】:

    标签: java multithreading threadpoolexecutor


    【解决方案1】:

    如果您的 Worker 类实现了 Callable 而不是 Runnable,那么您将能够通过使用 Future 对象来查看线程何时完成,以查看线程是否返回了一些结果(例如布尔值,它会告诉您它是否已经完成执行与否)。

    看看下面的“8.Futures and Callables”@网站部分,它正是你所需要的imo: http://www.vogella.com/articles/JavaConcurrency/article.html

    编辑:因此,在所有 Future 都表明它们各自的 Callable 执行完成之后,可以安全地假设您的 executor 已完成执行并且可以手动关闭/终止。

    【讨论】:

      【解决方案2】:

      类似这样的:

          exec.shutdown();
      
          // waiting for executors to finish their jobs
          while (!exec.awaitTermination(50, TimeUnit.MILLISECONDS));
      
          // perform clean up work
      

      【讨论】:

      • 感谢 SilverHaze。正是我想要的。
      【解决方案3】:

      你可以像这样使用线程来监控 ThreadPoolExecutor

      import java.util.concurrent.ThreadPoolExecutor;
      
      public class MyMonitorThread implements Runnable {
           private ThreadPoolExecutor executor;
      
              private int seconds;
      
              private boolean run=true;
      
              public MyMonitorThread(ThreadPoolExecutor executor, int delay)
              {
                  this.executor = executor;
                  this.seconds=delay;
              }
      
              public void shutdown(){
                  this.run=false;
              }
      
              @Override
              public void run()
              {
                  while(run){
                          System.out.println(
                              String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
                                  this.executor.getPoolSize(),
                                  this.executor.getCorePoolSize(),
                                  this.executor.getActiveCount(),
                                  this.executor.getCompletedTaskCount(),
                                  this.executor.getTaskCount(),
                                  this.executor.isShutdown(),
                                  this.executor.isTerminated()));
                          try {
                              Thread.sleep(seconds*1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                  }
      
              }
      
      }
      

      并添加

      MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
                  Thread monitorThread = new Thread(monitor);
                  monitorThread.start(); 
      

      到你的 ThreadPoolExecutor 所在的班级。

      它将每 3 秒显示一次您的线程池执行程序状态。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-01-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-01-29
        • 2020-02-06
        相关资源
        最近更新 更多