【问题标题】:Java: notify main class when all threads in threadpool are finished / same instance of object in different threadsJava:当线程池中的所有线程都完成时通知主类/不同线程中的相同对象实例
【发布时间】:2023-03-24 15:48:02
【问题描述】:

ThreadPoolExecutor 中的所有线程都完成时,如何通知我的主类实例化ThreadPoolExecutor

ThreadPoolExecutor threadPool = null;
ThreadClass threadclass1;
ThreadClass threadclass2;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(maxPoolSize);

puclic MyClass(){
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);

        threadClass1 = new ThreadClass;
        threadClass2 = new ThreadClass;

        threadPool.execute(threadClass1);
        threadPool.execute(threadClass2);

        //Now I would like to do something until the threadPool is done working
        //The threads fill a ConcurrentLinkedQueueand I would like to poll
        //the queue as it gets filled by the threads and output 
        //it to XML via JAX-RS

}

编辑 1

当我的线程从某个地方获取数据并将这些信息填充到 ConcurrentLinkedQueue 中时,我基本上想在 MyClass 中执行一些操作以使用结果更新 XML 输出。当所有线程都终止时,我想向实例化 MyClass 的 JAX-RS Web 服务返回 true,以便 Web 服务知道所有数据都已获取,它现在可以显示最终的 XML 文件

编辑 2

我将Queue 传递给线程,以便它们可以将项目添加到队列中。当一个driver 完成向articleQueue 添加项目时,我想在我的主类中执行一项操作,从Queue 轮询实体并将其交给response 对象以某种方式显示它。

当我将队列传递给线程时,它们是使用同一个对象还是使用对象的“副本”,以便线程内的更改不会影响主对象?那不是我想要的行为。当我检查DriverarticleQueue 的大小时,它是18DriverControllerarticleQueue 的大小是0

当一个线程向队列中添加了一些东西而不是我的 while 循环时,有没有更好的方法来做出反应?如何修改我的代码以访问不同类中的相同对象?

驱动控制器

public class DriverController {

    Queue<Article> articleQueue;

    ThreadPoolExecutor threadPool = null;
    final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
            maxPoolSize);

    public DriverController(Response response) {

        articleQueue = new ConcurrentLinkedQueue<Article>();
        threadPool = new ThreadPoolExecutor();
        Driver driver = new Driver(this.articleQueue);

        threadPool.execute(driver);
        // More drivers would be executed here which add to the queue

        while (threadPool.getActiveCount() > 0) {
            // this.articleQueue.size() gives back 0 here ... why?
            if(articleQueue.size()>0){
                response.addArticle(articleQueue.poll());
            }
        }

    }
}

驱动程序

public class Driver implements Runnable{

    private Queue<Article> articleQueue;

    public DriverAlliedElectronics(Queue articleQueue) {
        this.articleQueue = articleQueue;
    }

    public boolean getData() {
        // Here would be the code where the article is created ...

        this.articleQueue.offer(article);
        return true;
    }

    public void run() {
        this.getData();
        // this.articleQueue.size() gives back 18 here ...

    }
}

【问题讨论】:

  • 我不认为你想等待 threads 完成,你想等待 tasks 完成。这看起来对吗?你已经得到了一些关于等待线程的答案,这可能不是你真正想要的。
  • 让我的线程从某处获取数据并将此信息填充到 ConcurrentLinkedQueue 我基本上想在MyClass 中执行一些操作以使用结果更新 XML 输出。当所有线程终止时,我想将 true 返回到实例化 MyClass 的 JAX-RS Web 服务,这样 Web 服务就知道所有数据都已获取,它现在可以显示最终的 XML 文件。

标签: java multithreading threadpool


【解决方案1】:

你应该尝试使用下面的sn-p

//Now I would like to wait until the threadPool is done working
threadPool.shutdown();
while (!threadPool.isTerminated()) {
    try {
        threadPool.awaitTermination(10, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

【讨论】:

  • 您假设池需要在工作批处理结束时关闭。这可能不是真的。
  • ...但是在线程池用于特定任务的情况下,这是最简单的解决方案。为我工作。
【解决方案2】:

也许ExecutorCompletionService 可能适合您:

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ExecutorCompletionService.html

以上链接中的示例:

void solve(Executor e, Collection<Callable<Result>> solvers)
  throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers)
        ecs.submit(s);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
        Result r = ecs.take().get();
        if (r != null) 
            use(r);
    }
}

【讨论】:

  • 我想我一开始就错误地陈述了我的问题,所以现在我修复了它。基本上,问题归结为:我的线程如何通过实例化类访问公共对象,以及我的暗示类如何在线程完成执行时做一些事情。
【解决方案3】:

您应该使用submit,而不是使用execute。这将返回一个Future 实例,您可以在该实例上wait 以完成任务。这样您就不需要轮询或关闭池。

【讨论】:

    【解决方案4】:

    我认为没有办法明确地做到这一点。您可以轮询getCompletedTaskCount() 以等待它变为零。

    为什么不收集提交时返回的Future 对象并检查所有这些对象是否已完成?只需依次致电get() 即可。由于该呼叫阻塞,您只需依次等待每个呼叫,然后逐渐跌倒在集合中,直到您等待每个呼叫。

    或者,您可以提交线程,并在执行程序上调用shutdown()。这样,提交的任务就会被执行,然后调用 terminate() 方法。如果你覆盖它,那么一旦所有任务完成,你就会得到一个回调(很明显,你不能再次使用那个执行器)。

    【讨论】:

    • +1 等待期货。这正是使用 executor 的正确方式。
    【解决方案5】:

    reference documentation来看,你有几个选择:

    ThreadPoolExecutor threadPool = null;
    ThreadClass threadclass1;
    ThreadClass threadclass2;
    final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(maxPoolSize);
    
    puclic MyClass(){
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);
    
        threadClass1 = new ThreadClass;
        threadClass2 = new ThreadClass;
    
        threadPool.execute(threadClass1);
        threadPool.execute(threadClass2);
    
        //Now I would like to wait until the threadPool is done working
    
        //Option 1:  shutdown() and awaitTermination()
        threadPool.shutDown();
        try {
            threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        //Option 2:  getActiveCount()
        while (threadPool.getActiveCount() > 0) {
            try {
                Thread.sleep(1000);
            }
            catch (InterruptedException ignored) {}
        }
    
        //Option 3:  getCompletedTaskCount()
        while (threadPool.getCompletedTaskCount() < totalNumTasks) {
            try {
                Thread.sleep(1000);
            }
            catch (InterruptedException ignored) {}
        }
    
    }
    

    综合考虑,我认为shutdown()awaitTermination() 是三者中的最佳选择。

    【讨论】:

      【解决方案6】:

      我认为你有点过度设计了一些东西。您并不真正关心线程或线程池,这是正确的。 Java 提供了很好的抽象,因此您不必这样做。您只需要知道您的任务何时完成,并且为此存在方法。只需提交您的工作,然后等待期货说他们已经完成。如果您真的想在单个任务完成后立即知道,您可以查看所有期货并在任何一项完成后立即采取行动。如果没有,并且您只关心一切是否完成,您可以从我即将发布的代码中删除一些复杂性。试试这个大小(注意 MultithreadedJaxrsResource 是可执行的):

      import javax.ws.rs.*;
      import javax.ws.rs.core.MediaType;
      import java.util.*;
      import java.util.concurrent.*;
      
      @Path("foo")
      public class MultithreadedJaxrsResource {
          private ExecutorService executorService;
      
          public MultithreadedJaxrsResource(ExecutorService executorService) {
              this.executorService = executorService;
          }
      
          @GET
          @Produces(MediaType.APPLICATION_XML)
          public AllMyArticles getStuff() {
              List<Future<Article>> futures = new ArrayList<Future<Article>>();
              // Submit all the tasks to run
              for (int i = 0; i < 10; i++) {
                  futures.add(executorService.submit(new Driver(i + 1)));
              }
              AllMyArticles articles = new AllMyArticles();
              // Wait for all tasks to finish
              // If you only care that everything is done and not about seeing
              // when each one finishes, this outer do/while can go away, and
              // you only need a single for loop to wait on each future.
              boolean allDone;
              do {
                  allDone = true;
                  Iterator<Future<Article>> futureIterator = futures.iterator();
                  while (futureIterator.hasNext()) {
                      Future<Article> future =  futureIterator.next();
                      if (future.isDone()) {
                          try {
                              articles.articles.add(future.get());
                              futureIterator.remove();
                          } catch (InterruptedException e) {
                              // thread was interrupted. don't do that.
                              throw new IllegalStateException("broken", e);
                          } catch (ExecutionException e) {
                              // execution of the Callable failed with an
                              // exception. check it out.
                              throw new IllegalStateException("broken", e);
                          }
                      } else {
                          allDone = false;
                      }
                  }
              } while (!allDone);
              return articles;
          }
      
          public static void main(String[] args) {
              ExecutorService executorService = Executors.newFixedThreadPool(10);
              AllMyArticles stuff =
                  new MultithreadedJaxrsResource(executorService).getStuff();
              System.out.println(stuff.articles);
              executorService.shutdown();
          }
      }
      
      class Driver implements Callable<Article> {
          private int i; // Just to differentiate the instances
      
          public Driver(int i) {
              this.i = i;
          }
      
          public Article call() {
              // Simulate taking some time for each call
              try {
                  Thread.sleep(1000 / i);
              } catch (InterruptedException e) {
                  System.err.println("oops");
              }
              return new Article(i);
          }
      }
      
      class AllMyArticles {
          public final List<Article> articles = new ArrayList<Article>();
      }
      
      class Article {
          public final int i;
      
          public Article(int i) {
              this.i = i;
          }
      
          @Override
          public String toString() {
              return "Article{" +
                             "i=" + i +
                             '}';
          }
      }
      

      这样做,您可以清楚地看到任务按照完成的顺序返回,因为最后一个任务由于睡眠时间最短而首先完成。如果您不关心完成顺序而只想等待所有完成,则循环会变得更加简单:

      for (Future<Article> future : futures) {
          try {
              articles.articles.add(future.get());
          } catch (InterruptedException e) {
              // thread was interrupted. don't do that.
              throw new IllegalStateException("broken", e);
          } catch (ExecutionException e) {
              // execution of the Callable failed with an exception. check it out.
              throw new IllegalStateException("broken", e);
          }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-03-02
        • 2017-10-25
        • 2012-11-03
        • 2016-04-18
        相关资源
        最近更新 更多