【问题标题】:Decoupling task submission and result processing while using ExecutorService/ThreadPool使用ExecutorService/ThreadPool时解耦任务提交和结果处理
【发布时间】:2020-11-05 13:17:45
【问题描述】:

我有一个用例,我应该设置一个在特定时间间隔触发的计划作业。工作是根据特定标准获取工作项(如果有的话)并处理它们。在处理这些项目时,我尝试使用带有固定线程池的 Executor 并并行处理它们。

我希望主线程是一个计划作业,它获取工作项以将任务提交给执行程序并继续进行下一次迭代。但是我的任务也返回处理的结果,所以我需要读取每个任务的结果并相应地处理它们。由于我必须阅读结果,因此我无法在将任务提交给执行程序时结束主线程处理。主线程正在(技术上)等待任务完成并处理结果,然后结束其执行

我的轮询类 - Poller.java - 它以一定的频率保持轮询

 public void startPollingProcess() {

        try {
            List<RenderingsLogEntity> extractionLogDelta = fetchWorkItems();

            if (extractionLogDelta != null && !extractionLogDelta.isEmpty()) {
                processor.processWorkItems(extractionLogDelta);
            } 

        } 
    }

我的处理器类 - Processor.java

// initializing the executor at some point in the class. And the thread pool will be using a worker queue with bounded capacity. If the queue is full for any reason and cant take any more tasks it will throw a RejectedExecutionException as you can see

ExecutorService executorService = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(3000))


 public void processWorkItems(List<RenderingsLogEntity> workItems) {


        List<Future<ProcessingResult>> processingResultsList = new ArrayList<>();

        try {

            // each task is submitted to the thread pool of bounded queue size. I would want the main thread to just submit these tasks. If any exception handle that. 
            for (WorkItem item : workItems) {
                processingResultsList.add(executorService.submit(() -> furtherProcessing(item)));
            }

        } catch (RejectedExecutionException e) {

           // do something in case of exception
            }
        }

     // Passing the handle on Future Results to a different method
        if (!deltaProcessingResultList.isEmpty())
            publishResults(deltaProcessingResultList);
    }


    private void publishResults(List<Future<ProcessingResult>> processingResultsList) {

        try {
            // Reading the results of each task and handling it. This is still being done by the main thread and result.get() is a blocking call.  My  polling execution  is not complete until I handle these results. But I want that to be done by someone else and I want my polling to continue 
            for (Future<ProcessingResult> result : deltaProcessingResultList) {
                ProcessingResult resultStatus = result.get();

            // handle the result accordingly
            }

        } catch (InterruptedException | ExecutionException e) {
            logger.error(e.getMessage(), e);
        }
    }


【问题讨论】:

    标签: java scheduled-tasks threadpool executorservice java.util.concurrent


    【解决方案1】:

    听起来您想在工作线程中处理WorkItem publishResult;

    public void processWorkItems(List<RenderingsLogEntity> workItems) {
        try {
          for (WorkItem item : workItems) {
            executorService.submit(() -> {
              ProcessingResult result = furtherProcessing(item);
              publishResult(result);
            });
          }
        } catch (RejectedExecutionException e) {
           // handle exception
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-04-16
      • 1970-01-01
      • 2011-06-15
      • 2015-01-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多