【问题标题】:Java parallel work iterator?Java并行工作迭代器?
【发布时间】:2010-11-16 17:43:36
【问题描述】:

我正在寻找一个类,我可以在其中重写一个方法来完成工作,并像迭代器一样返回结果。像这样的:

ParallelWorkIterator<Result> itr = new ParallelWorkIterator<Result>(trials,threads) {

  public Result work() {
    //do work here for a single trial...
    return answer;
  }

};
while (itr.hasNext()) {
  Result result = itr.next();
  //process result...
}

这主要用于蒙特卡罗模拟之类的事情,但我不想每次都处理设置线程池和管理返回线程。我推出了自己的课程,希望能做到这一点,但我对它不够自信,我想检查一下这样的东西是否已经存在。

编辑:明确地说,我希望它在每个工作方法返回后继续在后台运行并排队结果,直到所有试验都完成。所以下一个方法可能会等到队列中有结果才返回。

【问题讨论】:

    标签: java multithreading concurrency iterator


    【解决方案1】:

    我能想到的最接近的方法是使用CompletionService 在完成时累积结果。

    简单示例:

    ExecutorService executor = Executors.newSingleThreadExecutor(); // Create vanilla executor service.
    CompletionService<Result> completionService = new ExecutorCompletionService<Result>(executor); // Completion service wraps executor and is notified of results as they complete.
    Callable<Result> callable = new MyCallable();
    
    executor.submit(callable); // Do not store handle to Future here but rather obtain from CompletionService when we *know* the result is complete.
    
    Future<Result> fut = completionService.take(); // Will block until a completed result is available.
    Result result = fut.get(); // Will not block as we know this future represents a completed result.
    

    我不建议将其包装在 Iterator 接口后面,因为 Future get() 方法可能会抛出两个可能的检查异常:ExecutionExceptionInterruptedException,因此您需要捕获并吞下这些异常或者将它们重新抛出为RuntimeExceptions,这两者都不是一件好事。此外,如果有正在进行的任务,您的IteratorhasNext()next() 方法可能需要阻塞,这对于使用Iterator 的客户端可能被认为是违反直觉的。相反,我会实现自己的更具描述性的界面;例如

    public interface BlockingResultSet {
      /**
       * Returns next result when it is ready, blocking is required.
       * Returns null if no more results are available.
       */  
      Result take() throws InterruptedException, ExecutionException;
    }
    

    (称为take() 的方法通常表示java.util.concurrent 包中的阻塞调用)。

    【讨论】:

      【解决方案2】:

      看看ExecutorCompletionService。它可以满足您的所有需求。

         void solve(Executor e, Collection<Callable<Result>> solvers)
           throws InterruptedException, ExecutionException {
             //This class will hold and execute your tasks
             CompletionService<Result> ecs
                 = new ExecutorCompletionService<Result>(e);
             //Submit (start) all the tasks asynchronously
             for (Callable<Result> s : solvers)
                 ecs.submit(s);
             //Retrieve completed task results and use them
             int n = solvers.size();
             for (int i = 0; i < n; ++i) {
                 Result r = ecs.take().get();
                 if (r != null)
                     use(r);
             }
         }
      

      使用 CompletionService 的好处是它总是返回第一个完成的结果。这可确保您无需等待任务完成,并让未完成的任务在后台运行。

      【讨论】:

      • 谢谢,看起来它可以满足我的所有需求。我只是希望 java 不是那么冗长。
      • 冗长明确是一把双刃剑的一半。
      • 我为处理冗长所做的是编写一个扩展 ExecutorCompletionService 的包装类,并保留提交的 Futures 列表。 public boolean hasRemaining(){return 0
      • @Tim 是的,我可能只是将它封装在我已经编写的类中。
      【解决方案3】:

      我建议查看 Java Executors

      您提交了许多任务并为每个任务返回一个Future 对象。您的工作在后台处理,并且您遍历 Future 对象(就像您在上面所做的那样)。每个 Future 在可用时返回一个结果(通过调用 get() - 这会阻塞,直到结果在单独的线程中生成)

      【讨论】:

        猜你喜欢
        • 2014-07-30
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-08-15
        • 1970-01-01
        • 2016-07-04
        • 1970-01-01
        相关资源
        最近更新 更多