【问题标题】:Why is CompletableFuture.supplyAsync succeeding a random number of times?为什么 CompletableFuture.supplyAsync 会成功随机次数?
【发布时间】:2017-07-15 07:38:38
【问题描述】:

我对 Java 8 中的 lambda 和异步代码都是新手。我不断得到一些奇怪的结果...

我有以下代码:

import java.util.concurrent.CompletableFuture;

public class Program {

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            String test = "Test_" + i;
            final int a = i;

            CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> doPost(test));
            cf.thenRun(() -> System.out.println(a)) ;
        }
    }

    private static boolean doPost(String t) {
        System.out.println(t);

        return true;
    }
}

实际代码要长得多,因为doPost 方法会将一些数据发布到Web 服务。但是,我可以用这个基本代码复制我的问题。

我想让 doPost 方法执行 100 次,但出于性能原因异步执行(为了比执行 100 次同步调用更快地将数据推送到 Web 服务)。

在上面的代码中,“doPost”方法运行的次数是随机的,但总是不超过 20-25 次。没有抛出异常。似乎某些线程处理机制默默地拒绝创建新线程并执行它们的代码,或者线程默默地崩溃而不使程序崩溃。

我还有一个问题,如果我向doPost 方法添加的功能比上面显示的更多,它会达到该方法只是静默中断的地步。在这种情况下,我尝试在 return 语句之前添加一个System.out.println("test"),但它从未被调用。循环 100 次的循环确实运行了 100 次迭代。

至少可以说,这种行为令人困惑。

我错过了什么?为什么作为参数提供给supplyAsync 的函数运行看似随机的次数?

编辑:只是想指出,情况与被标记为可能重复的问题并不完全相同,因为该问题涉及任意深度嵌套的期货,而这个问题涉及并行那些。但是,它们失败的原因实际上是相同的。这些案例似乎足够不同,值得向我提出单独的问题,但其他人可能不同意......

【问题讨论】:

  • 您的 main 方法不会等待期货完成,因此 JVM 在您的任务完成执行之前退出。
  • 谢谢@DidierL - 现在你提到它似乎很明显。我习惯了异步代码,但是在 Javascript 的上下文中,它有一个事件循环。那么在Java中,那么需要一种引用计数技术吗?在这种情况下,由于异步的东西是在 main() 中正确完成的,因此使用 .allOf() 创建一个新的 CompletableFuture 并在其上执行一个 .get() 可能就足够了;但据我了解, .get() 是阻塞的,所以如果想避免阻塞,一种引用计数技术应该可以解决这个问题。
  • @JoeDyndale 在等待所有请求完成时(即在程序结束之前),阻塞任何请求(以任何顺序)都可以。其他人仍将根据需要运行,这与加入 N 个创建的线程没有什么不同,这些线程必须在继续更多工作之前完成。如果有更多异步工作需要在没有统一连接的情况下完成,那么这应该是“流水线”作为先前工作的结果——这听起来不像这里的情况。
  • 阻塞本身没有问题,特别是因为您的主线程无事可做,并且您希望它等到您的期货完成。

标签: java asynchronous lambda completable-future


【解决方案1】:

默认情况下CompletableFuture 使用自己的ForkJoinPool.commonPool()(参见CompletableFuture 实现)。这个默认池只创建 daemon 线程,例如如果它们还活着,它们不会阻止主应用程序终止。

您有以下选择:

  1. 将所有CompletionStage 收集到某个数组中,然后生成java.util.concurrent.CompletableFuture#allOf().toCompletableFuture().join() - 这将保证在join()

  2. 在您自己的线程池中使用 *Async 操作,该线程池仅包含 非守护进程 线程,如下例所示:

    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(10, r -> {
            Thread t = new Thread(r);
            t.setDaemon(false); // must be not daemon
            return t;
        });
    
        for (int i = 0; i < 100; i++) {
            final int a = i;
    
            // the operation must be Async with our thread pool
            CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> doPost(a), pool);
            cf.thenRun(() -> System.out.printf("%s: Run_%s%n", Thread.currentThread().getName(), a));
        }
    
        pool.shutdown(); // without this the main application will be blocked forever
    }
    
    private static boolean doPost(int t) {
        System.out.printf("%s: Post_%s%n", Thread.currentThread().getName(), t);
    
        return true;
    }
    

【讨论】:

  • 这也是我在其他地方发现的。忘记回来并再次更新问题。所以特此接受回答:)
猜你喜欢
  • 2014-01-05
  • 1970-01-01
  • 1970-01-01
  • 2013-09-26
  • 1970-01-01
  • 1970-01-01
  • 2016-08-19
  • 2020-03-06
  • 1970-01-01
相关资源
最近更新 更多