【问题标题】:Multithreaded search operation多线程搜索操作
【发布时间】:2009-07-19 14:55:41
【问题描述】:

我有一个方法可以接受一系列查询,我需要针对不同的搜索引擎 Web API(例如 Google 或 Yahoo's)运行它们。为了使进程并行化,为每个查询生成一个线程,然后在最后joined,因为我的应用程序只能在之后继续我得到每个的结果/em> 查询。我目前有这些方面的东西:

public abstract class class Query extends Thread {
    private String query;

    public abstract Result[] querySearchEngine();
    @Override
    public void run() {
        Result[] results = querySearchEngine(query);
        Querier.addResults(results);
    }

}

public class GoogleQuery extends Query {
    public Result querySearchEngine(String query) { 
        // access google rest API
    }
}

public class Querier {
    /* Every class that implements Query fills this array */
    private static ArrayList<Result> aggregatedResults;

    public static void addResults(Result[]) { // add to aggregatedResults }

    public static Result[] queryAll(Query[] queries) {
        /* for each thread, start it, to aggregate results */
        for (Query query : queries) {
            query.start();
        }
        for (Query query : queries) {
            query.join();
        }
        return aggregatedResults;
    }
}

最近,我发现Java 中有一个新的 API 用于执行并发作业。即Callable接口,FutureTaskExecutorService。我想知道这个新 API 是否应该使用,以及它们是否比传统的 RunnableThread 更有效。

在研究了这个新的 API 之后,我想出了以下代码(简化版):

   public abstract class Query implements Callable<Result[]> {
        private final String query; // gets set in the constructor

        public abstract Result[] querySearchEngine();
        @Override
        public Result[] call() {
            return querySearchEngine(query);
        }
    }

public class Querier {   
        private ArrayList<Result> aggregatedResults;

        public Result[] queryAll(Query[] queries) {
            List<Future<Result[]>> futures = new ArrayList<Future<Result[]>>(queries.length);
            final ExecutorService service = Executors.newFixedThreadPool(queries.length);  
            for (Query query : queries) {
                futures.add(service.submit(query));  
            }
            for (Future<Result[]> future : futures) {  
                aggregatedResults.add(future.get());  // get() is somewhat similar to join?
            }  
            return aggregatedResults;
        }
    }

我是这个并发 API 的新手,我想知道在上面的代码中是否有可以改进的地方,以及它是否比第一个选项更好(使用 @987654330 @)。有些课程我没有探索,例如FutureTask,等等。我也很想听听这方面的任何建议。

【问题讨论】:

  • 对我来说看起来不错,不确定我会在您的第二个示例中进行任何更改。在您的第一个示例中,我将扩展 Runnable 而不是 Thread,但这只是挑剔。
  • +1,对我来说已经足够了。

标签: java multithreading concurrency search-engine future


【解决方案1】:

您的代码存在一些问题。

  1. 您可能应该使用 ExecutorService.invokeAll() 方法。 创建新线程和新线程池的成本可能很高(尽管可能无法与调用外部搜索引擎相比)。 invokeAll() 可以为您管理线程。
  2. 您可能不想混合使用数组和泛型。
  3. 您调用的是 aggregatedResults.add() 而不是 addAll()。
  4. 如果成员变量可以是 queryAll() 函数调用的本地变量,则无需使用它们。

所以,类似以下的东西应该可以工作:

public abstract class Query implements Callable<List<Result>> {
    private final String query; // gets set in the constructor

    public abstract List<Result> querySearchEngine();
    @Override
    public List<Result> call() {
        return querySearchEngine(query);
    }
}

public class Querier {   
    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public List<Result> queryAll(List<Query> queries) {
        List<Future<List<Result>>> futures = executor.submitAll(queries);
        List<Result> aggregatedResults = new ArrayList<Result>();
        for (Future<List<Result>> future : futures) {  
            aggregatedResults.addAll(future.get());  // get() is somewhat similar to join?
        }  
        return aggregatedResults;
    }
}

【讨论】:

  • 更改为缓存线程池可能不是最佳选择,因为您的应用程序受 IO 限制,因为大多数搜索引擎都非常快并且会迅速响应。
  • @kd304:确实,我使用的搜索引擎非常快(目前是谷歌和雅虎)。但是,我使用了很多查询,因此需要并发。您对此有何建议?从我在 newCachedThreadPool 方法的 javadoc 上读到的内容来看,它似乎符合我的目的。不过话说回来,我对这个 API 还是很陌生。
  • @Avi:非常感谢您的建议!
  • @JG:很难说,因为 Java 中没有可用的自适应池,它会根据 I/O 与 CPU 的比率调整其大小。一种启发式方法是测量响应的等待时间、响应传递时间和响应处理时间,然后使用固定的池大小来交织它们。在我的 100MBit/2 核计算机上,通过使用大小为 10 的池进行处理可以实现最佳性能。
【解决方案2】:

作为进一步的改进,您可以考虑使用CompletionService 它解耦了提交和检索的顺序,而是将所有未来的结果放在一个队列中,您可以按照它们完成的顺序从中获取结果..

【讨论】:

  • 由于在这种情况下应用程序只能在 每个 任务完成后才能继续,因此 CompletionService 可能不适合此处。
  • @Avi:我不同意,它没有future.get()那么好。
  • @kd304: 你会使用 CompletionService 的什么方法来获取一组任务的所有结果?
  • 类似excCmpSrv.take().get() 的东西,如果没有任何提交的 Futures,你必须小心不要 take() (它会等待一个新的没有来的)。 . 使用 poll 或计算提交的 Callables 的数量是解决此问题的一种方法
【解决方案3】:

我可以建议您使用 Future.get() with a timeout 吗?

否则,只需要一个搜索引擎没有响应就可以让一切停止(如果你最终遇到网络问题,它甚至不需要是搜索引擎问题)

【讨论】:

  • 谢谢。用于此类操作的典型超时值是多少?
  • 我认为您需要问问自己准备等待多长时间 :-) 使其可配置并将其设置为(例如)正常响应时间的 10 倍。
  • 我认为超时代码中的正确层不是Future.get(),而是对搜索引擎本身的网络(HTTP?)调用。如果搜索引擎超时,最好将其捕获到那里,而不是占用不再需要的线程。
  • 假设(!)您正在谈论 HTTP。在代码库的更高、更抽象的区域中,我不一定会做出这样的假设。但是,我认为你是对的,在 HTTP 操作上设置超时总是一个好主意,然后引发适当的异常。所以我会在both Future.get() 和 HTTP 连接中设置一些超时。它们是否具有相同的值是另一回事。