【发布时间】:2020-04-18 12:57:52
【问题描述】:
我正在通过 CompletableFuture API 学习并发性。假设我有两个任务:一个需要 250 毫秒,另一个需要 2500 毫秒。在以下代码中:
Supplier<List<Long>> supplyIds = () -> {
sleep(200);
return(Arrays.asList(1L, 2L, 3L));
};
Function<List<Long>, CompletableFuture<List<User>>> fetchUsers1 = idList -> {
sleep(250);
System.out.println("User2"+ Thread.currentThread().getName());
Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
return(CompletableFuture.supplyAsync(userSupplier));
};
Function<List<Long>, CompletableFuture<List<User>>> fetchUsers2 = idList -> {
sleep(2500);
System.out.println("User2"+ Thread.currentThread().getName());
Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
return(CompletableFuture.supplyAsync(userSupplier));
};
Consumer<List<User>> displayer = users -> {
users.forEach(System.out::println);
};
CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIds);
CompletableFuture<List<User>> users1 = completableFuture.thenCompose(fetchUsers1);
CompletableFuture<List<User>> users2 = completableFuture.thenCompose(fetchUsers2);
users1.thenRun(()-> System.out.println("User 1"));
users2.thenRun(()-> System.out.println("User 2"));
users1.acceptEither(users2, displayer);
sleep(6000);
我得到以下结果:
User2ForkJoinPool.commonPool-worker-1
User 2
1
2
3
User2ForkJoinPool.commonPool-worker-1
User 1
我了解代码是同步运行的,因为正在使用相同的公共分叉连接池线程,并且我们没有指定线程。我很困惑为什么首先执行fetchUsers2task,然后执行fetchUsers1task(这似乎与每次运行一致)。我假设因为 thenCompose 在代码中首先在 fetchUsers1 上调用,所以它会首先“排队”。
【问题讨论】:
标签: java concurrency completable-future completion-stage