【问题标题】:How to access running threads inside ThreadPoolExecutor?如何访问 ThreadPoolExecutor 中正在运行的线程?
【发布时间】:2016-06-04 22:15:22
【问题描述】:

我有一个正在运行的线程队列,并希望在它执行时公开它的一些数据,以监控进程。

ThreadPoolExecutor 提供对其队列的访问,我可以遍历这些对象来调用我重写的 toString() 方法,但这些只是等待执行的线程。

有没有办法访问当前正在运行的线程来调用我的方法?或者一般来说有更好的方法来完成这项任务?

为了更清楚地说明目的,这里有一些一般想法的代码:

public class GetDataTask implements Runnable {
    private String pageNumber;
    private int dataBlocksParsed;
    private String source;
    private String dataType;


    public GetDataTask(String source, String dataType) {
        this.source = source;
        this.dataType = dataType;
    }

    @Override
    public void run() {
        //do stuff that affects pageNumber and dataBlocksParsed
    }

    @Override
    public String toString() {
        return "GetDataTask{" +
            "source=" + source +
            ", dataType=" + dataType +
            ", pageNumber=" + pageNumber +
            ", dataBlocksParsed=" + dataBlocksParsed +
            '}';
    }
}

还有一个持有执行者的类:

public class DataParseManager {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));

    public void addParseDataTask(String source, String dataType) {
        executor.execute(new GetDataTask(source, dataType));
    }

    // here's the method that I need
    public String getInfo() {
        StringBuilder info = new StringBuilder();
        //and here's the method that I'm missing - executor.getActiveThreads()
        for (Runnable r : executor.getActiveThreads()) {
            info.append(((GetDataTask) r).toString()).append('\n');
        }
        return info.append(executor.toString()).toString();
   }
}

【问题讨论】:

  • 线程还是任务?它们是有区别的!从任何正在执行的任务中,您都可以使用Thread.currentThread 来访问正在执行它的线程并检索信息。您当然可以保留对所有已提交任务的引用以从中检索信息。
  • 为什么会有一个正在运行的线程队列?你的意思是你有一个线程池?如果你想监控你的任务正在做什么,我建议你让任务定期更新一些关于他们正在做什么的信息,这样你就可以监控了。
  • 哦,刚刚意识到,我可能误读了这个问题。您是否已经在使用 ExecutorService?我上面写的只有在这样的情况下才有意义。
  • 是的,伙计们,对不起。我已经编辑了从 ExecutorService 到 ThreadPoolExecutor 的原始消息,但是 vektor 的编辑反转了它,我没有注意到。
  • 你能详细说明一下你想在哪里公开吗?也许有一点代码?例如,如果您只想打印出承担任务的线程名称,您可以在 run 方法中执行此操作,只需添加 using Thread.currentThread 并打印其名称和任务的标识符。

标签: java multithreading threadpoolexecutor java-threads


【解决方案1】:

这样包装 Runnable 怎么样。

static class MonitorRunnable implements Runnable {

    static final List<Runnable> activeTasks = Collections.synchronizedList(new ArrayList<>());

    private final Runnable runnable;

    public MonitorRunnable(Runnable runnable) {
        this.runnable = runnable;
    }

    @Override
    public void run() {
        activeTasks.add(runnable);
        runnable.run();
        activeTasks.remove(runnable);
    }
}

public class DataParseManager {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));

    public void addParseDataTask(String source, String dataType) {
        executor.execute(new MonitorRunnable(new GetDataTask(source, dataType)));
    }

    // here's the method that I need
    public String getInfo() {
        StringBuilder info = new StringBuilder();
        //and here's the method that I'm missing - executor.getActiveThreads()
        synchronized (MonitorRunnable.activeTasks) {
            for (Runnable r : MonitorRunnable.activeTasks) {
                info.append(((GetDataTask) r).toString()).append('\n');
            }
        }
        return info.append(executor.toString()).toString();
   }
}

【讨论】:

  • 一开始没看懂,现在明白了。有趣的方法,我会尝试的。谢谢!
  • 所以,我最终选择了这个选项,因为它看起来更快应用于我现有的代码并且感觉更优雅一些。到目前为止效果很好。还将toString() 添加到MonitorRunnable 以查看来自池队列的任务数据。谢谢大家!特别感谢@Fildor 在这方面付出的额外努力。 :)
【解决方案2】:

每当您将线程添加到队列中时,也将其添加到第二个数据结构中,例如HashSet。然后,如果您需要访问正在运行的线程,您可以检查ExecutorService 的队列以找到仍在等待执行的线程:HashSet 中的每个线程仍不在ExecutorService 的队列中当前正在运行。

【讨论】:

  • 托马斯,谢谢你的建议。我已经编辑了原始问题,但是您的方法无论如何都应该有效。除了我不确定如何正确管理完成的线程。我需要手动从集合中删除它。但如果没有更优雅的方式......
  • @and_rew ThreadPoolExecutor 中的线程不会像创建线程并启动它时那样“完成”。它们将被重复使用。将完成的是您提交到队列以执行的可运行或可调用。
  • 您不一定需要“听”线程来完成。相反,每当您想访问正在运行的线程时,遍历HashSet,忽略ExecutorService 队列中的线程,并跳过那些(并删除它们)已完成的线程。
【解决方案3】:

就像我在评论中写的那样。我会主动更新共享统计对象方法:

我会像这样更改任务:

public class GetDataTask implements Runnable {
    private String pageNumber;
    private int dataBlocksParsed;
    private String source;
    private String dataType;
    HashMap<GetDataTask,String> statistics


    public GetDataTask(String source, String dataType, HashMap<GetDataTask,String> statistics) {
        this.source = source;
        this.dataType = dataType;
        this.statistics = statistics;
    }

    @Override
    public void run() {
        // you'll probably want to immediately have stats available:
        statistics.put(this, this.toString());

        //do stuff that affects pageNumber and dataBlocksParsed
        // vv this will probably be inside your "do stuff" loop
        statistics.put(this, this.toString());
        // loop end

        // if you do not want stats of finished tasks, remove "this" here.
    }

    @Override
    public String toString() {
        return "GetDataTask{" +
            "source=" + source +
            ", dataType=" + dataType +
            ", pageNumber=" + pageNumber +
            ", dataBlocksParsed=" + dataBlocksParsed +
            '}';
    }
}

和经理:

public class DataParseManager {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));

    private HashMap<GetDataTask,String> stats = new ConcurrentHashMap<GetDataTask,String>();       

    public void addParseDataTask(String source, String dataType) {
        executor.execute(new GetDataTask(source, dataType, stats));
    }

    // here's the method that I need
    public String getInfo() {
        StringBuilder info = new StringBuilder();
        //and here's the method that I'm missing - executor.getActiveThreads()

        // >>> iterate "stats"'s values to build the info string ...            

        return info.append(executor.toString()).toString();
   }
}

更新

您可以通过迭代 Map 的 keys(它们是正在执行的任务)并在它们上调用 toString 来轻松更改提取信息的方法。不过,这与 saka 的方法非常相似。也许你觉得他更舒服。

【讨论】:

    【解决方案4】:

    由于您可以控制使用的执行器,我将使用ThreadPoolExecutorbeforeExecuteafterExecute 方法来跟踪正在运行的任务并使用它来创建getActiveTasks 方法。

    import java.util.Set;
    import java.util.concurrent.*;
    
    public class ActiveTasksThreadPool extends ThreadPoolExecutor {
    
        private final ConcurrentHashMap<Runnable, Boolean> activeTasks = new ConcurrentHashMap<>();
    
        public ActiveTasksThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
    
            activeTasks.put(r, Boolean.TRUE);
            super.beforeExecute(t, r);
        }
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
    
            super.afterExecute(r, t);
            activeTasks.remove(r);
        }
    
        public Set<Runnable> getActiveTasks() {
            // the returned set will not throw a ConcurrentModificationException.
            return activeTasks.keySet();
        }
    
        public static void main(String[] args) {
    
            final int maxTasks = 5;
            ActiveTasksThreadPool tp = new ActiveTasksThreadPool(maxTasks, maxTasks, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
            try {
                System.out.println("Active tasks: " + tp.getActiveTasks());
                final CountDownLatch latch = new CountDownLatch(1); 
                for (int i = 0; i < maxTasks; i ++) {
                    final int rnumber = i;
                    tp.execute(new Runnable() {
                        @Override
                        public void run() {
                            try { latch.await(); } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                        @Override
                        public String toString() {
                            return "Runnable " + rnumber;
                        }
                    });
                }
                Thread.sleep(100L); // give threads a chance to start
                System.out.println("Active tasks: " + tp.getActiveTasks());
                latch.countDown();
                Thread.sleep(100L); // give threads a chance to finish
                System.out.println("Active tasks: " + tp.getActiveTasks());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                tp.shutdownNow();
            }
        }
    
    }
    

    【讨论】:

      【解决方案5】:

      您只需要将对正在运行的线程的引用存储在 ThreadPoolExecutor 内将触发的某处,在其他答案之上添加,这是一个小型应用程序的示例,它每 1 秒读取一次在 ThreadPoolExecutor 内运行的线程状态直到关机:

      package sample;
      
      import java.util.ArrayList;
      import java.util.Collections;
      import java.util.List;
      import java.util.Random;
      import java.util.concurrent.Executors;
      import java.util.concurrent.ThreadPoolExecutor;
      import java.util.concurrent.TimeUnit;
      
      public class Test {
      
          public static void main(String[] args) {
              ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
      
              for (int i = 1; i <= 10; i++)
              {
                  Task task = new Task("Task " + i);
                  executor.execute(task);
              }
      
              executor.shutdown();
      
              try {
                  while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                      System.out.println("Awaiting completion of threads, threads states: " + Task.getThreadsStateCount());
                  }
      
              } catch (InterruptedException e) {
              }
      
              System.out.println("Executor shutdown -> " + executor.isShutdown());
          }
      }
      
      class Task implements Runnable {
      
          static final List<Thread> activeTasks = Collections.synchronizedList(new ArrayList<>());
          static final Random r = new Random();
      
          private String name;
      
          public Task(String name) {
              this.name = name;
          }
      
          @Override
          public void run() {
              Thread t = Thread.currentThread();
              System.out.println("current thread : " + t.getName() + " group " + t.getThreadGroup() + " state " + t.getState());
              activeTasks.add(t);
      
              try {
                  int tries = 0;
      
                  while (tries < 10) {
                      int randomNum = r.nextInt(10000);
                      // do some expensive computation
                      for(int i = 0; i < 4; i++) {
                          isPrime(r.nextLong());
                      }
      
                      // now sleep
                      Thread.sleep(randomNum);
                      tries++;
                  }
      
              } catch (InterruptedException e) {
              }
      
              System.out.println("completed task for thread : " + t.getName() + " group " + t.getThreadGroup() + " state " + t.getState());
          }
      
          static boolean isPrime(long n)
          {
              if (n <= 1)
                  return false;
              if (n <= 3)
                  return true;
      
              if (n % 2 == 0 || n % 3 == 0)
                  return false;
      
              for (int i = 5; i * i <= n; i = i + 6)
                  if (n % i == 0 || n % (i + 2) == 0)
                      return false;
      
              return true;
          }
      
          public static String getThreadsStateCount() {
              return "NEW: " + getCountThreadsState(Thread.State.NEW) +
                      " ,RUNNABLE: " + getCountThreadsState(Thread.State.RUNNABLE) +
                      " ,WAITING: " + getCountThreadsState(Thread.State.WAITING) +
                      " ,TIMED_WAITING: " + getCountThreadsState(Thread.State.TIMED_WAITING) +
                      " ,BLOCKED: " + getCountThreadsState(Thread.State.BLOCKED) +
                      " ,TERMINATED: " + getCountThreadsState(Thread.State.TERMINATED);
          }
      
          public static long getCountThreadsState(Thread.State state) {
              return activeTasks.stream().filter(x -> x.getState() == state).count();
          }
      }
      

      // 打印类似:

      等待线程完成,线程状态:NEW: 0 ,RUNNABLE: 1 ,WAITING: 0 ,TIMED_WAITING: 9 ,BLOCKED: 0 ,TERMINATED: 0

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-05-10
        • 2016-10-30
        • 1970-01-01
        • 2012-01-21
        • 2017-03-22
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多