【问题标题】:Guava Futures Wait for Callback番石榴期货等待回调
【发布时间】:2023-11-17 14:28:01
【问题描述】:

我有一个期货列表,在每个未来完成时,我都有一个应该执行的回调。

我正在使用 Futures.successfulAsList 检查是否所有的期货都已完成。但是,这并没有考虑到回调的完成。

有没有办法确保回调完成?

代替回调,我可以使用 Futures.transform 包装到另一个 Future 并检查它是否完成。但是,有了这个,我无法访问包装的未来引发的运行时异常。

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20));

List<ListenableFuture<Object>> futures = new ArrayList<>();

for (int i = 1; i <= 20; i++) {
  final int x = i * 100;

  ListenableFuture<Object> future = service.submit(new Callable() {
    @Override
    public Object call() throws Exception {
      Thread.sleep(10000 / x);

      return x;
    }
  });

  futures.add(future);

  Futures.addCallback(future, new FutureCallback<Object>() {

    @Override
    public void onFailure(Throwable t) {
      t.printStackTrace();
    }

    @Override
    public void onSuccess(Object x) {
      try {Thread.sleep((Integer) x * 10);} catch (Exception e) {}

      System.out.println(x);
    }
  });
}

ListenableFuture<List<Object>> listFuture = Futures
    .successfulAsList(futures);
System.out.println("Waiting...");
System.out.println(listFuture.get());
System.out.println("Done");

【问题讨论】:

  • 这是一个非常好的问题,因为在我看来这可能是一个相当常见的用例,但 Guava API 不直接支持。

标签: java guava future


【解决方案1】:

无需睡眠即可实现:

    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20));

    List<ListenableFuture<Object>> futures = new ArrayList<>();

    for (int i = 1; i <= 20; i++) {
        final int x = i * 100;

        ListenableFuture<Object> future = service.submit(new Callable() {
            @Override
            public Object call() throws Exception {
                Thread.sleep(10000 / x);

                return x;
            }
        });

        Futures.addCallback(future, new FutureCallback<Object>() {

            @Override
            public void onFailure(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onSuccess(Object x) {
                try {Thread.sleep((Integer) x * 10);} catch (Exception e) {}

                System.out.println(x);
            }
        });

        /* all Callbacks added in one list (ExecutionList) and executed by order. If not defined 3d argument (Executor)
           then callbacks executed sequentially at task thread.
         */
        final SettableFuture<Object> lastCalledFuture = SettableFuture.create();
        Futures.addCallback(future, new FutureCallback<Object>() {
            @Override
            public void onSuccess(Object result) {
                lastCalledFuture.set(result);
            }

            @Override
            public void onFailure(Throwable t) {
                lastCalledFuture.setException(t);
            }
        });
        futures.add(lastCalledFuture);
    }

    ListenableFuture<List<Object>> listFuture = Futures
            .successfulAsList(futures);
    System.out.println("Waiting...");
    System.out.println(listFuture.get());
    System.out.println("Done");

【讨论】:

    【解决方案2】:

    如果您只想阻塞直到您提交的 N 个任务的回调全部完成,您可以创建一个 CountDownLatch,其中包含 N 个 count。然后在每个回调完成时调用 countDown()(无论它成功还是失败)和await()它在你想要阻止的地方。

    或者,您可以像在答案中那样做一些事情,但不要使用ListenableFutureTask&lt;Void&gt; 和无操作Runnable,只需使用SettableFuture&lt;Void&gt; 并在完成时调用set(null)

    【讨论】:

    • 我采用了CountDownLatch 方法。效果很好,实施起来很干净!
    【解决方案3】:

    谢谢你,这行得通!

    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(20));
    
    List<ListenableFuture<Void>> futures = new ArrayList<>();
    
    for (int i = 1; i <= 20; i ++) {
      final int x = i * 100;
    
      ListenableFuture<Object> future = service.submit(new Callable(){
        @Override
        public Object call() throws Exception {
          Thread.sleep(10000 / x);
    
          return x;
        }
      });
    
      //Blank runnable to evaluate write completion
      Runnable callback = new Runnable(){
        @Override
        public void run() {
          //do nothing
        }
      };
    
      final ListenableFutureTask<Void> callbackFuture = ListenableFutureTask.create(callback, null);
    
      futures.add(callbackFuture);
    
      Futures.addCallback(future, new FutureCallback<Object>() {
    
        @Override
        public void onFailure(Throwable t) {
          try {
            t.printStackTrace();
          }
          finally {
            callbackFuture.run();
          }
        }
    
        @Override
        public void onSuccess(Object x) {
          try {
            try {Thread.sleep((Integer)x*10);}catch(Exception e){}
    
            System.out.println(x);
          }
          finally {
            callbackFuture.run();
          }
        }
      });
    }
    
    ListenableFuture<List<Void>> listFuture = Futures.successfulAsList(futures);
    System.out.println("Waiting...");
    System.out.println(listFuture.get());
    System.out.println("Done");
    

    【讨论】:

      【解决方案4】:

      如果您为每个回调创建另一个未来,并确保它将在回调内完成,怎么样。

      // create "callback" future here
      futures.add(callbackFuture);
      
      Futures.addCallback(future, new FutureCallback<Object>() {
      
        @Override
        public void onFailure(Throwable t) {
          t.printStackTrace();
          // do something with callbackFuture
        }
      
        @Override
        public void onSuccess(Object x) {
          try {Thread.sleep((Integer) x * 10);} catch (Exception e) {}
      
          System.out.println(x);
          // do something with callbackFuture
        }
      });
      

      【讨论】:

      • 你指的callbackFuture是什么?
      • 只是一些未来,您可以在回调中访问。没什么特别的。甚至不需要做任何事情。 (我不熟悉番石榴类;在 java.util.concurrent 中,您将使用 emtpy runnable 执行“new FutureTask()”,并在回调中调用 run(),然后 isDone() 将返回 true;番石榴肯定有等价物)