【问题标题】:executing a method in parallel from a call method从调用方法并行执行方法
【发布时间】:2016-03-22 20:16:48
【问题描述】:

我有一个客户正在使用的库,他们正在传递 DataRequest 对象,其中包含 useridtimeout 和其他一些字段。现在我使用这个DataRequest 对象来创建一个URL,然后我使用RestTemplate 进行HTTP 调用,我的服务返回一个JSON 响应,我用它来创建一个DataResponse 对象并返回这个DataResponse 对象给他们。

下面是客户通过将DataRequest 对象传递给我的DataClient 类。如果在getSyncData 方法中花费的时间过多,我将使用客户在DataRequest 中传递的超时值来使请求超时。

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public DataResponse getSyncData(DataRequest key) {
        DataResponse response = null;
        Future<DataResponse> responseFuture = null;

        try {
            responseFuture = getAsyncData(key);
            response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true);
            // logging exception here               
        }

        return response;
    }   

    @Override
    public Future<DataResponse> getAsyncData(DataRequest key) {
        DataFetcherTask task = new DataFetcherTask(key, restTemplate);
        Future<DataResponse> future = service.submit(task);

        return future;
    }
}

DataFetcherTask类:

public class DataFetcherTask implements Callable<DataResponse> {

    private DataRequest key;
    private RestTemplate restTemplate;

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() throws Exception {
        // In a nutshell below is what I am doing here. 
        // 1. Make an url using DataRequest key.
        // 2. And then execute the url RestTemplate.
        // 3. Make a DataResponse object and return it.

        // I am calling this whole logic in call method as LogicA
    }
}

到目前为止,我的 DataFetcherTask 类负责一个 DataRequest 键,如上所示..

问题陈述:-

现在我有一个小的设计更改。客户将DataRequest(例如keyA)对象传递给我的库,然后我将使用DataRequest(keyA)中存在的用户ID对另一个服务(我当前的设计中没有这样做)进行新的http调用对象将返回用户 ID 列表,因此我将使用这些用户 ID,并为响应中返回的每个用户 ID 创建几个其他 DataRequest (keyB, keyC, keyD) 对象。然后我将拥有List&lt;DataRequest&gt; 对象,该对象将具有 keyB、keyC 和 keyD DataRequest 对象。 List&lt;DataRequest&gt; 中的最大元素将是三个,仅此而已。

现在对于List&lt;DataRequest&gt; 中的每个DataRequest 对象,我想在DataFetcherTask.call 方法之上并行执行,然后通过为每个键添加每个DataResponse 来生成List&lt;DataResponse&gt;。所以我将对DataFetcherTask.call 进行三个并行调用。这个并行调用背后的想法是在相同的全局超时值中获取所有最多三个键的数据。

所以我的建议是 - DataFetcherTask 类将返回 List&lt;DataResponse&gt; 对象而不是 DataResponse,然后 getSyncDatagetAsyncData 方法的签名也会改变。所以这里是算法:

  • 使用客户传递的 DataRequest 对象通过调用另一个 HTTP 服务来生成List&lt;DataRequest&gt;
  • List&lt;DataRequest&gt;DataFetcherTask.call 方法中对每个DataRequest 进行并行调用,并将List&lt;DataResponse&gt; 对象而不是DataResponse 返回给客户。

通过这种方式,我可以在第 1 步和第 2 步中应用相同的全局超时。如果上述任何一个步骤都需要时间,我们将在 getSyncData 方法中超时。

DataFetcherTask 设计更改后的类:

public class DataFetcherTask implements Callable<List<DataResponse>> {

    private DataRequest key;
    private RestTemplate restTemplate;
    // second executor here
    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public List<DataResponse> call() throws Exception {
        List<DataRequest> keys = generateKeys();
        CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);

        int count = 0;
        for (final DataRequest key : keys) {
            comp.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    return performDataRequest(key);
                }
            });
        }

        List<DataResponse> responseList = new ArrayList<DataResponse>();
        while (count-- > 0) {
            Future<DataResponse> future = comp.take();
            responseList.add(future.get());
        }
        return responseList;
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys() {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }       

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
    }
}

现在我的问题是 -

  • 一定要这样吗?解决这个问题的正确设计是什么?我的意思是在另一个call 方法中有call 方法看起来很奇怪?
  • 我们是否需要像我的代码中那样有两个执行程序?有没有更好的方法来解决这个问题或我们可以在这里做任何类型的简化/设计更改?

我已经简化了代码,以便清楚我想要做什么......

【问题讨论】:

  • 调用集合上的方法可能会合理地调用每个元素上的方法。这是一个相当普遍的模式。您可以为所有客户端设置一个全局 ExecutorService,尽管使用 ForkJoinPool 可能是更好的选择,因为您希望等待线程在等待时做一些工作。
  • 那么在我当前的情况下,我如何才能为所有客户提供全局ExecutorService?另外,您在哪里看到我有一个等待线程,因此我应该探索ForkJoinPool?我的意思是你为什么在这里推荐ForkJoinPool
  • 您可以通过将字段设为static 来使其成为全局字段。 Future.get() 是一个阻塞操作。想象一下,您有许多线程在与实际工作的线程池中执行此操作。您可能会到达池中所有线程都被阻塞的地步。
  • 您使用的是哪个版本的 Java?如果 Java 8 可用,CompletableFutures 为将任务链接在一起提供了一个强大的框架。如果没有,Guava 的 ListenableFuture 提供了类似的功能,但没有利用 Java 8 的 lambda。
  • 我现在使用的是 Java 7。我的公司需要一些时间才能开始使用 Java 8。:( ...哦,我明白了,我可以使用 ListenableFuture 看起来但我以前从未使用过它,所以不确定如何在我的示例中使用它。

标签: java multithreading thread-safety executorservice callable


【解决方案1】:

正如您问题的 cmets 中已经提到的,您可以使用 Java 的 ForkJoin 框架。这将为您节省DataFetcherTask 中的额外线程池。

您只需在您的DataClient 中使用ForkJoinPool 并将您的DataFetcherTask 转换为RecursiveTaskForkJoinTask 的子类型之一)。这使您可以轻松地并行执行其他子任务。

因此,在这些修改之后,您的代码将如下所示:

DataFetcherTask

DataFetcherTask 现在是 RecursiveTask,它首先生成密钥并为每个生成的密钥调用子任务。这些子任务在与父任务相同的ForkJoinPool 中执行。

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {

  private final DataRequest key;
  private final RestTemplate restTemplate;

  public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
      this.key = key;
      this.restTemplate = restTemplate;
  }

  @Override
  protected List<DataResponse> compute() {
    // Create subtasks for the key and invoke them
    List<DataRequestTask> requestTasks = requestTasks(generateKeys());
    invokeAll(requestTasks);

    // All tasks are finished if invokeAll() returns.
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
    for (DataRequestTask task : requestTasks) {
      try {
        responseList.add(task.get());
      } catch (InterruptedException | ExecutionException e) {
        // TODO - Handle exception properly
        Thread.currentThread().interrupt();
        return Collections.emptyList();
      }
    }

    return responseList;
  }

  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
    List<DataRequestTask> tasks = new ArrayList<>(keys.size());
    for (DataRequest key : keys) {
      tasks.add(new DataRequestTask(key));
    }

    return tasks;
  }

  // In this method I am making a HTTP call to another service
  // and then I will make List<DataRequest> accordingly.
  private List<DataRequest> generateKeys() {
      List<DataRequest> keys = new ArrayList<>();
      // use key object which is passed in contructor to make HTTP call to another service
      // and then make List of DataRequest object and return keys.
      return keys;
  }

  /** Inner class for the subtasks. */
  private static class DataRequestTask extends RecursiveTask<DataResponse> {

    private final DataRequest request;

    public DataRequestTask(DataRequest request) {
      this.request = request;
    }

    @Override
    protected DataResponse compute() {
      return performDataRequest(this.request);
    }

    private DataResponse performDataRequest(DataRequest key) {
      // This will have all LogicA code here which is shown in my original design.
      // everything as it is same..
      return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
    }
  }

}

数据客户端

DataClient 不会有太大变化,除了新的线程池:

public class DataClient implements Client {

  private final RestTemplate restTemplate = new RestTemplate();
  // Replace the ExecutorService with a ForkJoinPool
  private final ForkJoinPool service = new ForkJoinPool(15);

  @Override
  public List<DataResponse> getSyncData(DataRequest key) {
      List<DataResponse> responsList = null;
      Future<List<DataResponse>> responseFuture = null;

      try {
          responseFuture = getAsyncData(key);
          responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
      } catch (TimeoutException | ExecutionException | InterruptedException ex) {
          responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
          responseFuture.cancel(true);
          // logging exception here
      }

      return responsList;
  }

  @Override
  public Future<List<DataResponse>> getAsyncData(DataRequest key) {
      DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
      return this.service.submit(task);
  }
}

一旦您使用 Java8,您可能会考虑将实现更改为 CompletableFutures。然后它看起来像这样:

DataClientCF

public class DataClientCF {

  private final RestTemplate restTemplate = new RestTemplate();
  private final ExecutorService executor = Executors.newFixedThreadPool(15);

  public List<DataResponse> getData(DataRequest initialKey) {
    return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
      .thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
      .thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
      .exceptionally(t -> { throw new RuntimeException(t); })
      .join();
  }

  private List<DataRequest> generateKeys(DataRequest key) {
    return new ArrayList<>();
  }

  private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
    return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
  }
}

正如 cmets 中所述,Guava 的 ListenableFutures 将为 Java7 提供类似的功能,但如果没有 Lambda,它们往往会变得笨拙。

【讨论】:

    【解决方案2】:

    据我所知,RestTemplate 是阻塞的,它是在ForkJoinTask 的 ForkJoinPool JavaDoc 中说的:

    计算应避免同步方法或块,并应尽量减少其他阻塞同步,除了加入其他任务或使用同步器(如被宣传为与 fork/join 调度合作的 Phaser)。 ...
    任务也不应该执行阻塞 IO,...

    通话中的调用是多余的。
    而且您不需要两个执行者。您也可以在getSyncData(DataRequest key) 中返回部分结果。这可以像这样完成

    DataClient.java

    public class DataClient implements Client {
    
        private RestTemplate restTemplate = new RestTemplate();
        // first executor
        private ExecutorService service = Executors.newFixedThreadPool(15);
    
        @Override
        public List<DataResponse> getSyncData(DataRequest key) {
            List<DataResponse> responseList = null;
            DataFetcherResult response = null;
            try {
                response = getAsyncData(key);
                responseList = response.get(key.getTimeout(), key.getTimeoutUnit());
            } catch (TimeoutException ex) {
                response.cancel(true);
                responseList = response.getPartialResult();
            }
            return responseList;
        }
    
        @Override
        public DataFetcherResult getAsyncData(DataRequest key) {
            List<DataRequest> keys = generateKeys(key);
            final List<Future<DataResponse>> responseList = new ArrayList<>();
            final CountDownLatch latch = new CountDownLatch(keys.size());//assume keys is not null
            for (final DataRequest _key : keys) {
                responseList.add(service.submit(new Callable<DataResponse>() {
                    @Override
                    public DataResponse call() throws Exception {
                        DataResponse response = null;
                        try {
                            response = performDataRequest(_key);
                        } finally {
                            latch.countDown();
                            return response;
                        }
                    }
                }));
            }
            return new DataFetcherResult(responseList, latch);
        }
    
        // In this method I am making a HTTP call to another service
        // and then I will make List<DataRequest> accordingly.
        private List<DataRequest> generateKeys(DataRequest key) {
            List<DataRequest> keys = new ArrayList<>();
            // use key object which is passed in contructor to make HTTP call to another service
            // and then make List of DataRequest object and return keys.
            return keys;
        }
    
        private DataResponse performDataRequest(DataRequest key) {
            // This will have all LogicA code here which is shown in my original design.
            // everything as it is same..
            return null;
        }
    }
    

    DataFetcherResult.java

    public class DataFetcherResult implements Future<List<DataResponse>> {
        final List<Future<DataResponse>> futures;
        final CountDownLatch latch;
    
        public DataFetcherResult(List<Future<DataResponse>> futures, CountDownLatch latch) {
            this.futures = futures;
            this.latch = latch;
        }
    
        //non-blocking
        public List<DataResponse> getPartialResult() {
            List<DataResponse> result = new ArrayList<>(futures.size());
            for (Future<DataResponse> future : futures) {
                try {
                    result.add(future.isDone() ? future.get() : null);
                    //instead of null you can return new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR);
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                    //ExecutionException or CancellationException could be thrown, especially if DataFetcherResult was cancelled
                    //you can handle them here and return DataResponse with corresponding DataErrorEnum and DataStatusEnum
                }
            }
            return result;
        }
    
        @Override
        public List<DataResponse> get() throws ExecutionException, InterruptedException {
            List<DataResponse> result = new ArrayList<>(futures.size());
            for (Future<DataResponse> future : futures) {
                result.add(future.get());
            }
            return result;
        }
    
        @Override
        public List<DataResponse> get(long timeout, TimeUnit timeUnit)
                throws ExecutionException, InterruptedException, TimeoutException {
            if (latch.await(timeout, timeUnit)) {
                return get();
            }
            throw new TimeoutException();//or getPartialResult()
        }
    
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = true;
            for (Future<DataResponse> future : futures) {
                cancelled &= future.cancel(mayInterruptIfRunning);
            }
            return cancelled;
        }
    
        @Override
        public boolean isCancelled() {
            boolean cancelled = true;
            for (Future<DataResponse> future : futures) {
                cancelled &= future.isCancelled();
            }
            return cancelled;
        }
    
        @Override
        public boolean isDone() {
            boolean done = true;
            for (Future<DataResponse> future : futures) {
                done &= future.isDone();
            }
            return done;
        }
    
        //and etc.
    }
    

    我用CountDownLatch 编写了它,它看起来很棒,但请注意有细微差别。 你可能会在DataFetcherResult.get(long timeout, TimeUnit timeUnit) 中卡住一会儿,因为CountDownLatch 与未来的状态不同步。 latch.getCount() == 0 可能会发生,但并非所有期货都会同时返回 future.isDone() == true。因为他们已经在finally {} Callable 的块中通过了latch.countDown();,但没有改变内部state,它仍然等于NEW
    所以在get(long timeout, TimeUnit timeUnit) 内部调用get() 可能会导致一点延迟。
    类似的情况是described here

    Get with timeout DataFetcherResult.get(...) 可以使用期货 future.get(long timeout, TimeUnit timeUnit) 重写,您可以从类中删除 CountDownLatch

    public List<DataResponse> get(long timeout, TimeUnit timeUnit)
            throws ExecutionException, InterruptedException{
        List<DataResponse> result = new ArrayList<>(futures.size());
        long timeoutMs = timeUnit.toMillis(timeout);
        boolean timeout = false;
        for (Future<DataResponse> future : futures) {
            long beforeGet = System.currentTimeMillis();
            try {
                if (!timeout && timeoutMs > 0) {
                    result.add(future.get(timeoutMs, TimeUnit.MILLISECONDS));
                    timeoutMs -= System.currentTimeMillis() - beforeGet;
                } else {
                    if (future.isDone()) {
                        result.add(future.get());
                    } else {
                        //result.add(new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR)); ?
                    }
                }
            } catch (TimeoutException e) {
                result.add(new DataResponse(DataErrorEnum.TIMEOUT, DataStatusEnum.ERROR));
                timeout = true;
            }
            //you can also handle ExecutionException or CancellationException here
        }
    
        return result;
    }
    

    此代码是作为示例给出的,应该在生产中使用之前对其进行测试,但似乎是合法的:)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-08-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-01-30
      • 2019-12-08
      • 1970-01-01
      相关资源
      最近更新 更多