【问题标题】:Execute each subtask in parallel in a multithreaded environment在多线程环境中并行执行每个子任务
【发布时间】:2017-03-09 00:54:43
【问题描述】:

我正在开发一个库,它将一个对象 DataRequest 作为输入参数并基于该对象,我将构造一个 URL,然后使用 apache 调用我们的应用服务器http 客户端,然后将响应返回给正在使用我们库的客户。有些客户会调用 executeSync 方法来获取相同的功能,有些客户会调用我们的 executeAsync 方法来获取数据。

  • executeSync() - 等到我有结果,返回结果。
  • executeAsync() - 立即返回一个 Future,如果需要,可以在其他事情完成后处理它。

下面是我的 DataClient 类,它有以上两种方法:

public class DataClient implements Client {
  private final ForkJoinPool forkJoinPool = new ForkJoinPool(16);
  private CloseableHttpClient httpClientBuilder;

  // initializing httpclient only once
  public DataClient() {
    try {
      RequestConfig requestConfig =
          RequestConfig.custom().setConnectionRequestTimeout(500).setConnectTimeout(500)
              .setSocketTimeout(500).setStaleConnectionCheckEnabled(false).build();
      SocketConfig socketConfig =
          SocketConfig.custom().setSoKeepAlive(true).setTcpNoDelay(true).build();

      PoolingHttpClientConnectionManager poolingHttpClientConnectionManager =
          new PoolingHttpClientConnectionManager();
      poolingHttpClientConnectionManager.setMaxTotal(300);
      poolingHttpClientConnectionManager.setDefaultMaxPerRoute(200);

      httpClientBuilder =
          HttpClientBuilder.create().setConnectionManager(poolingHttpClientConnectionManager)
              .setDefaultRequestConfig(requestConfig).setDefaultSocketConfig(socketConfig).build();
    } catch (Exception ex) {
      // log error
    }
  }

  @Override
  public List<DataResponse> executeSync(DataRequest key) {
    List<DataResponse> responsList = null;
    Future<List<DataResponse>> responseFuture = null;

    try {
      responseFuture = executeAsync(key);
      responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
    } catch (TimeoutException | ExecutionException | InterruptedException ex) {
      responsList =
          Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT,
              DataStatusEnum.ERROR));
      responseFuture.cancel(true);
      // logging exception here
    }
    return responsList;
  }

  @Override
  public Future<List<DataResponse>> executeAsync(DataRequest key) {
    DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder);
    return this.forkJoinPool.submit(task);
  }
}

下面是我的 DataFetcherTask 类,它还有一个静态类 DataRequestTask,它通过 URL 调用我们的应用服务器:

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {
  private final DataRequest key;
  private final CloseableHttpClient httpClientBuilder;

  public DataFetcherTask(DataRequest key, CloseableHttpClient httpClientBuilder) {
    this.key = key;
    this.httpClientBuilder = httpClientBuilder;
  }

  @Override
  protected List<DataResponse> compute() {
    // Create subtasks for the key and invoke them
    List<DataRequestTask> requestTasks = requestTasks(generateKeys());
    invokeAll(requestTasks);

    // All tasks are finished if invokeAll() returns.
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
    for (DataRequestTask task : requestTasks) {
      try {
        responseList.add(task.get());
      } catch (InterruptedException | ExecutionException e) {
        Thread.currentThread().interrupt();
        return Collections.emptyList();
      }
    }
    return responseList;
  }

  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
    List<DataRequestTask> tasks = new ArrayList<>(keys.size());
    for (DataRequest key : keys) {
      tasks.add(new DataRequestTask(key));
    }
    return tasks;
  }

  // In this method I am making a HTTP call to another service
  // and then I will make List<DataRequest> accordingly.
  private List<DataRequest> generateKeys() {
    List<DataRequest> keys = new ArrayList<>();
    // use key object which is passed in contructor to make HTTP call to another service
    // and then make List of DataRequest object and return keys.
    return keys;
  }

  /** Inner class for the subtasks. */
  private static class DataRequestTask extends RecursiveTask<DataResponse> {
    private final DataRequest request;

    public DataRequestTask(DataRequest request) {
      this.request = request;
    }

    @Override
    protected DataResponse compute() {
      return performDataRequest(this.request);
    }

    private DataResponse performDataRequest(DataRequest key) {
      MappingHolder mappings = DataMapping.getMappings(key.getType());
      List<String> hostnames = mappings.getAllHostnames(key);

      for (String hostname : hostnames) {
        String url = generateUrl(hostname);
        HttpGet httpGet = new HttpGet(url);
        httpGet.setConfig(generateRequestConfig());
        httpGet.addHeader(key.getHeader());

        try (CloseableHttpResponse response = httpClientBuilder.execute(httpGet)) {
          HttpEntity entity = response.getEntity();
          String responseBody =
              TestUtils.isEmpty(entity) ? null : IOUtils.toString(entity.getContent(),
                  StandardCharsets.UTF_8);

          return new DataResponse(responseBody, DataErrorEnum.OK, DataStatusEnum.OK);
        } catch (IOException ex) {
          // log error
        }
      }
      return new DataResponse(DataErrorEnum.SERVERS_DOWN, DataStatusEnum.ERROR);
    }
  }
}

对于每个DataRequest 对象,都有一个DataResponse 对象。现在一旦有人通过传递DataRequest 对象调用我们的库,我们在内部创建List&lt;DataRequest&gt; 对象,然后我们并行调用每个DataRequest 对象并返回List&lt;DataResponse&gt;,其中列表中的每个DataResponse 对象将响应对应DataRequest对象。

以下是流程:

  • 客户将通过传递DataRequest 对象来调用DataClient 类。他们可以根据需要调用executeSync()executeAsync() 方法。
  • 现在在DataFetcherTask 类(这是RecursiveTaskForkJoinTask's 子类型之一)中,给定一个key 对象,它是一个DataRequest,我将生成List&lt;DataRequest&gt;,然后调用每个子任务对列表中的每个 DataRequest 对象并行处理。这些子任务在与父任务相同的ForkJoinPool 中执行。
  • 现在在DataRequestTask 类中,我通过创建一个URL 执行每个DataRequest 对象并返回其DataResponse 对象。

问题陈述:

由于这个库是在一个非常高吞吐量的环境中调用的,所以它必须非常快。对于同步调用,这里可以在单独的线程中执行吗?在这种情况下,它会产生线程的额外成本和资源以及线程上下文切换的成本,所以我有点困惑。另外我在这里使用ForkJoinPool,这将节省我使用额外线程池的时间,但它在这里是正确的选择吗?

有没有更好、更有效的方法来做同样的事情,同时也能提高性能?我正在使用 Java 7 并且也可以访问 Guava 库,所以如果它可以简化任何事情,那么我也愿意这样做。

当它在非常重的负载下运行时,我们似乎看到了一些争用。当在非常重的负载下运行时,这段代码有什么办法可以进入线程争用?

【问题讨论】:

  • 听起来像ThreadPool 会很有用,但请记住,过早的优化是万恶之源
  • @ScaryWombat 同意,这就是为什么我会进行负载测试,但问题是我所拥有的是否合理,使用 ForkJoinPool 也是一种特殊形式的 ThreadPool。那么我使用executeSync方法的方式对不对?
  • 您看到什么样的争用?也许对于重负载new ForkJoinPool(16); 不够用,尝试将16 增加到更大的值

标签: java multithreading guava executorservice forkjoinpool


【解决方案1】:

我认为在您的情况下,最好使用异步 http 调用,请参阅链接:HttpAsyncClient。而且您不需要使用线程池。

在 executeAsync 方法中创建空的 CompletableFuture() 并将其传递给客户端调用,在回调调用中通过调用 complete 来设置 completableFuture 的结果(如果引发异常,则为 completeExceptionally)。 ExecuteSync 方法实现看起来不错。

编辑:

对于 java 7,它只需要替换一个 completableFuture 来承诺在 guava 中的实现,比如 ListenableFuture 或类似的东西

【讨论】:

    【解决方案2】:

    选择使用ForkJoinPool 是正确的,它旨在提高许多小任务的效率:

    ForkJoinPool 与其他类型的 ExecutorService 的不同之处主要在于它采用了工作窃取:池中的所有线程都尝试查找并执行提交到池和/或由其他活动任务创建的任务(如果最终阻塞等待工作,如果不存在)。当大多数任务产生其他子任务(大多数 ForkJoinTasks 也是如此)时,以及当许多小任务从外部客户端提交到池时,这可以实现高效处理。尤其是在构造函数中将 asyncMode 设置为 true 时,ForkJoinPools 也可能适用于从未加入的事件式任务。

    我建议在构造函数中尝试asyncMode = true,因为在您的情况下,任务从未加入:

    public class DataClient implements Client {
        private final ForkJoinPool forkJoinPool = new ForkJoinPool(16, ForkJoinPool.ForkJoinWorkerThreadFactory, null, true);
    ...
    }
    

    对于executeSync(),您可以使用forkJoinPool.invoke(task),这是在池中执行同步任务以优化资源的托管方式:

    @Override
    public List<DataResponse> executeSync(DataRequest key) {
      DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder);
      return this.forkJoinPool.invoke(task);
    }
    

    如果您可以使用 Java 8,那么已经优化了一个公共池:ForkJoinPool.commonPool()

    【讨论】:

    • 你能给我举个例子,我应该使用asyncMode=true吗?还有我的executeSync() 方法会是什么样子?只是对此有点困惑。
    • 我在答案中添加了一些示例
    • 这意味着我不需要在executeSync 中调用executeAsync 方法,只需按照您的建议进行操作?如果是,那么现在超时如何出现?我的意思是如果调用超时则同步,然后我返回超时响应。这将如何在这里工作?
    • forkJoinPool.invoke(task) 不处理超时,只是执行task.compute() 方法并等待结果,这种情况下唯一的超时是 HttpClient 超时。要管理超时,您的方法 executeSync() 很好。
    • @david,您管理executeSync 中的超时,但是executeAsync 中的超时呢?每个调用者都必须管理它?
    猜你喜欢
    • 2018-10-07
    • 2012-06-25
    • 2014-11-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-02-21
    • 1970-01-01
    相关资源
    最近更新 更多