【问题标题】:Task scheduling using acceptEither synchronously on CompletableFuture在 CompletableFuture 上同步使用 acceptEither 进行任务调度
【发布时间】:2020-04-18 12:57:52
【问题描述】:

我正在通过 CompletableFuture API 学习并发性。假设我有两个任务:一个需要 250 毫秒,另一个需要 2500 毫秒。在以下代码中:

        Supplier<List<Long>> supplyIds = () -> {
            sleep(200);
            return(Arrays.asList(1L, 2L, 3L));
        };

        Function<List<Long>, CompletableFuture<List<User>>> fetchUsers1 = idList -> {
            sleep(250);
            System.out.println("User2"+ Thread.currentThread().getName());
            Supplier<List<User>> userSupplier = () ->  idList.stream().map(User::new).collect(Collectors.toList());
            return(CompletableFuture.supplyAsync(userSupplier));
        };
        Function<List<Long>, CompletableFuture<List<User>>> fetchUsers2 = idList -> {
            sleep(2500);
            System.out.println("User2"+ Thread.currentThread().getName());
            Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
            return(CompletableFuture.supplyAsync(userSupplier));
        };
        Consumer<List<User>> displayer = users -> {
            users.forEach(System.out::println);
        };

        CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIds);
        CompletableFuture<List<User>> users1 = completableFuture.thenCompose(fetchUsers1);
        CompletableFuture<List<User>> users2 = completableFuture.thenCompose(fetchUsers2);

        users1.thenRun(()-> System.out.println("User 1"));
        users2.thenRun(()-> System.out.println("User 2"));
        users1.acceptEither(users2, displayer);

        sleep(6000);

我得到以下结果:

User2ForkJoinPool.commonPool-worker-1
User 2
1
2
3
User2ForkJoinPool.commonPool-worker-1
User 1

我了解代码是同步运行的,因为正在使用相同的公共分叉连接池线程,并且我们没有指定线程。我很困惑为什么首先执行fetchUsers2task,然后执行fetchUsers1task(这似乎与每次运行一致)。我假设因为 thenCompose 在代码中首先在 fetchUsers1 上调用,所以它会首先“排队”。

【问题讨论】:

    标签: java concurrency completable-future completion-stage


    【解决方案1】:

    文档中没有任何内容表明调用顺序对thenCompose 很重要。

    由于您定义了两个独立阶段,它们都只依赖于completableFuture,因此users1user2 之间没有定义的顺序,结果顺序只是依赖于实现。

    您可以在一个环境中重复地获得特定顺序,但在不同环境中获得不同的顺序。即使在您的环境中,也有可能在某些运行中获得不同的顺序。如果启动线程在调用supplyAsync(supplyIds) 200 毫秒后失去CPU,它可能会在调用thenCompose(fetchUsers2) 之前立即执行thenCompose(fetchUsers1) 指定的操作。

    当两个动作之间的顺序很重要时,您必须对它们之间的依赖关系进行建模。

    注意同样的代码

    users1.thenRun(()-> System.out.println("User 1"));
    users2.thenRun(()-> System.out.println("User 2"));
    users1.acceptEither(users2, displayer);
    

    定义完全独立的动作。由于acceptEither 应用于users1users2,而不是thenRun 调用返回的完成阶段,它不依赖于打印语句的完成。这三个动作可以按任意顺序执行。

    【讨论】:

    • 感谢您的精彩回答。您能否详细说明订单依赖于实现的含义?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-01-11
    • 1970-01-01
    • 1970-01-01
    • 2020-06-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多