【问题标题】:How to propagate timeout from one FutureTask to another dependent one used by its Callable?如何将超时从一个 FutureTask 传播到其 Callable 使用的另一个依赖项?
【发布时间】:2026-01-22 23:30:01
【问题描述】:

我处于以下情况(也许我对整个事情进行了过度设计,或者我陷入了完全的僵局,但想不出另一种方式来做到这一点):

  • 取一个或多个FutureTask实现异步计算(监听网络中的多播数据包,收集不同种类的统计信息),我们将其命名为MulticastStatisticsProvider
  • 进行另一个依赖于第一个任务的计算来执行额外的计算(将统计数据与公式结合以公开一些合成信息),此计算也是异步的,因此在另一个 FutureTask 中定义,我们将调用 FormulaComputing
  • 问题:我们希望FormulaComputing.get(timeout, timeUnit) 在调用MulticastStatisticsProvider.get(timeout, timeUnit) 时将超时传播到其内部Callable 并且找不到实现此目的的方法。

以下是我目前实现的代码状态:

  • 这是调用者代码。

    // This is the code creating the formula computing code.
    public FormulaComputing getFormulaComputing() {
      // Retrieve from another service a list of FutureTasks already
      // scheduled for execution in their own ThreadPool.
      List<MulticastStatisticsProvider> requiredTasks = service.getRequiredTasks();
      // Create the formulaComputing task and schedule it for execution
      FormulaComputing formulaComputing = new FormulaComputing(requiredTasks);
      threadPool.execute(formulaComputing);
      return formulaComputing;
    }
    
    // And then, from another caller
    getFormulaComputing().get(10, TimeUnit.SECONDS);
    
  • 这是FormulaComputing 代码:

    public class FormulaComputing extends FutureTask<Object> {
      private long timeout;
      private TimeUnit timeUnit;
      private Map<String, Future<Map<String, ? extends AbstractSymposiumMessage>>> promises;
      private Closure<Object> callback;
    
      public FormulaComputing(List<MulticastStatisticsProvider> dependentTasks, Closure<Object> callback) {
        super(new Callable<Object>() {
          @Override
          public Object call() throws Exception {
            List<Object> results = new ArrayList<Object>();
            for (MulticastStatisticsProvider task : dependentTasks) {
              // Here is the problem, you cannot access field values in constructor: "Cannot refer to an instance method while explicitly invoking a constructor".
              results.add(task.get(getTimeout(), getTimeUnit()));
            }
            return callback.call(results.toArray());
          }
        });
      }
    
      @Override
      public Object get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        return super.get(timeout, timeUnit);
      }
    }
    

我曾设想通过自省通过我在自定义 get 方法中使用我手工制作的 Callable 更改 FutureTask 的私有内部 sync 字段,但自省和反思通常是可以避免的黑客攻击。

【问题讨论】:

    标签: java multithreading asynchronous futuretask


    【解决方案1】:

    如果你使用Guava,它看起来是ListenableFutures 的好例子:

    List<ListenableFuture<Object>> requiredTasks = ...;
    
    ListenableFuture<List<Object>> requiredTasksResult = Futures.allAsList(requiredTasks);
    
    ListenableFuture<Object> resultFuture = Futures.transform(requiredTasksResult, new Function<List<Object>, Object>() {
        public Object apply(List<Object> results) {
            // Apply computing formula
        }
    }, threadPool); // Function will be executed in threadPool
    
    Object result = resultFuture.get(10, TimeUnit.SECONDS);
    

    您可以通过将FutureTask 提交给ListeningExecutorService 或改用ListenableFutureTask 来获得ListenableFuture

    【讨论】:

    • 这似乎正是我所需要的。从文档中读取:“最常见的是,ListenableFuture 用作另一个派生 Future 的输入,如 Futures.allAsList 中。如果没有侦听器支持,许多此类方法都无法有效实现。”。谢谢!
    最近更新 更多