【发布时间】:2016-03-22 20:16:48
【问题描述】:
我有一个客户正在使用的库,他们正在传递 DataRequest 对象,其中包含 userid、timeout 和其他一些字段。现在我使用这个DataRequest 对象来创建一个URL,然后我使用RestTemplate 进行HTTP 调用,我的服务返回一个JSON 响应,我用它来创建一个DataResponse 对象并返回这个DataResponse 对象给他们。
下面是客户通过将DataRequest 对象传递给我的DataClient 类。如果在getSyncData 方法中花费的时间过多,我将使用客户在DataRequest 中传递的超时值来使请求超时。
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
// first executor
private ExecutorService service = Executors.newFixedThreadPool(15);
@Override
public DataResponse getSyncData(DataRequest key) {
DataResponse response = null;
Future<DataResponse> responseFuture = null;
try {
responseFuture = getAsyncData(key);
response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException ex) {
response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
responseFuture.cancel(true);
// logging exception here
}
return response;
}
@Override
public Future<DataResponse> getAsyncData(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, restTemplate);
Future<DataResponse> future = service.submit(task);
return future;
}
}
DataFetcherTask类:
public class DataFetcherTask implements Callable<DataResponse> {
private DataRequest key;
private RestTemplate restTemplate;
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() throws Exception {
// In a nutshell below is what I am doing here.
// 1. Make an url using DataRequest key.
// 2. And then execute the url RestTemplate.
// 3. Make a DataResponse object and return it.
// I am calling this whole logic in call method as LogicA
}
}
到目前为止,我的 DataFetcherTask 类负责一个 DataRequest 键,如上所示..
问题陈述:-
现在我有一个小的设计更改。客户将DataRequest(例如keyA)对象传递给我的库,然后我将使用DataRequest(keyA)中存在的用户ID对另一个服务(我当前的设计中没有这样做)进行新的http调用对象将返回用户 ID 列表,因此我将使用这些用户 ID,并为响应中返回的每个用户 ID 创建几个其他 DataRequest (keyB, keyC, keyD) 对象。然后我将拥有List<DataRequest> 对象,该对象将具有 keyB、keyC 和 keyD DataRequest 对象。 List<DataRequest> 中的最大元素将是三个,仅此而已。
现在对于List<DataRequest> 中的每个DataRequest 对象,我想在DataFetcherTask.call 方法之上并行执行,然后通过为每个键添加每个DataResponse 来生成List<DataResponse>。所以我将对DataFetcherTask.call 进行三个并行调用。这个并行调用背后的想法是在相同的全局超时值中获取所有最多三个键的数据。
所以我的建议是 - DataFetcherTask 类将返回 List<DataResponse> 对象而不是 DataResponse,然后 getSyncData 和 getAsyncData 方法的签名也会改变。所以这里是算法:
- 使用客户传递的 DataRequest 对象通过调用另一个 HTTP 服务来生成
List<DataRequest>。 - 在
List<DataRequest>到DataFetcherTask.call方法中对每个DataRequest进行并行调用,并将List<DataResponse>对象而不是DataResponse返回给客户。
通过这种方式,我可以在第 1 步和第 2 步中应用相同的全局超时。如果上述任何一个步骤都需要时间,我们将在 getSyncData 方法中超时。
DataFetcherTask 设计更改后的类:
public class DataFetcherTask implements Callable<List<DataResponse>> {
private DataRequest key;
private RestTemplate restTemplate;
// second executor here
private ExecutorService executorService = Executors.newFixedThreadPool(10);
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public List<DataResponse> call() throws Exception {
List<DataRequest> keys = generateKeys();
CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);
int count = 0;
for (final DataRequest key : keys) {
comp.submit(new Callable<DataResponse>() {
@Override
public DataResponse call() throws Exception {
return performDataRequest(key);
}
});
}
List<DataResponse> responseList = new ArrayList<DataResponse>();
while (count-- > 0) {
Future<DataResponse> future = comp.take();
responseList.add(future.get());
}
return responseList;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
}
}
现在我的问题是 -
- 一定要这样吗?解决这个问题的正确设计是什么?我的意思是在另一个
call方法中有call方法看起来很奇怪? - 我们是否需要像我的代码中那样有两个执行程序?有没有更好的方法来解决这个问题或我们可以在这里做任何类型的简化/设计更改?
我已经简化了代码,以便清楚我想要做什么......
【问题讨论】:
-
调用集合上的方法可能会合理地调用每个元素上的方法。这是一个相当普遍的模式。您可以为所有客户端设置一个全局 ExecutorService,尽管使用 ForkJoinPool 可能是更好的选择,因为您希望等待线程在等待时做一些工作。
-
那么在我当前的情况下,我如何才能为所有客户提供全局
ExecutorService?另外,您在哪里看到我有一个等待线程,因此我应该探索ForkJoinPool?我的意思是你为什么在这里推荐ForkJoinPool? -
您可以通过将字段设为
static来使其成为全局字段。Future.get()是一个阻塞操作。想象一下,您有许多线程在与实际工作的线程池中执行此操作。您可能会到达池中所有线程都被阻塞的地步。 -
您使用的是哪个版本的 Java?如果 Java 8 可用,
CompletableFutures 为将任务链接在一起提供了一个强大的框架。如果没有,Guava 的ListenableFuture提供了类似的功能,但没有利用 Java 8 的 lambda。 -
我现在使用的是 Java 7。我的公司需要一些时间才能开始使用 Java 8。:( ...哦,我明白了,我可以使用
ListenableFuture看起来但我以前从未使用过它,所以不确定如何在我的示例中使用它。
标签: java multithreading thread-safety executorservice callable