【发布时间】:2020-11-05 13:17:45
【问题描述】:
我有一个用例,我应该设置一个在特定时间间隔触发的计划作业。工作是根据特定标准获取工作项(如果有的话)并处理它们。在处理这些项目时,我尝试使用带有固定线程池的 Executor 并并行处理它们。
我希望主线程是一个计划作业,它获取工作项以将任务提交给执行程序并继续进行下一次迭代。但是我的任务也返回处理的结果,所以我需要读取每个任务的结果并相应地处理它们。由于我必须阅读结果,因此我无法在将任务提交给执行程序时结束主线程处理。主线程正在(技术上)等待任务完成并处理结果,然后结束其执行
我的轮询类 - Poller.java - 它以一定的频率保持轮询
public void startPollingProcess() {
try {
List<RenderingsLogEntity> extractionLogDelta = fetchWorkItems();
if (extractionLogDelta != null && !extractionLogDelta.isEmpty()) {
processor.processWorkItems(extractionLogDelta);
}
}
}
我的处理器类 - Processor.java
// initializing the executor at some point in the class. And the thread pool will be using a worker queue with bounded capacity. If the queue is full for any reason and cant take any more tasks it will throw a RejectedExecutionException as you can see
ExecutorService executorService = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(3000))
public void processWorkItems(List<RenderingsLogEntity> workItems) {
List<Future<ProcessingResult>> processingResultsList = new ArrayList<>();
try {
// each task is submitted to the thread pool of bounded queue size. I would want the main thread to just submit these tasks. If any exception handle that.
for (WorkItem item : workItems) {
processingResultsList.add(executorService.submit(() -> furtherProcessing(item)));
}
} catch (RejectedExecutionException e) {
// do something in case of exception
}
}
// Passing the handle on Future Results to a different method
if (!deltaProcessingResultList.isEmpty())
publishResults(deltaProcessingResultList);
}
private void publishResults(List<Future<ProcessingResult>> processingResultsList) {
try {
// Reading the results of each task and handling it. This is still being done by the main thread and result.get() is a blocking call. My polling execution is not complete until I handle these results. But I want that to be done by someone else and I want my polling to continue
for (Future<ProcessingResult> result : deltaProcessingResultList) {
ProcessingResult resultStatus = result.get();
// handle the result accordingly
}
} catch (InterruptedException | ExecutionException e) {
logger.error(e.getMessage(), e);
}
}
【问题讨论】:
标签: java scheduled-tasks threadpool executorservice java.util.concurrent