【问题标题】:How to kill CompletableFuture related threads?如何杀死 CompletableFuture 相关线程?
【发布时间】:2023-11-11 11:47:01
【问题描述】:

我有检查 CompletableFuture 执行时间的方法。如果这样的 CompletableFuture 执行时间超过 2 秒,我想终止此任务。但是,如果我没有控制执行 CompletableFuture 方法的线程,我该怎么办?

       final CompletableFuture<List<List<Student>>> responseFuture = new CompletableFuture<>();
responseFuture.supplyAsync(this::createAllRandomGroups)
        .thenAccept(this::printGroups)
        .exceptionally(throwable -> {
            throwable.printStackTrace();
            return null;
        });

createAllRandomGroups()

private List<List<Student>> createAllRandomGroups() {
    System.out.println("XD");
    List<Student> allStudents = ClassGroupUtils.getActiveUsers();
    Controller controller = Controller.getInstance();
    List<List<Student>> groups = new ArrayList<>();
    int groupSize = Integer.valueOf(controller.getGroupSizeComboBox().getSelectionModel().getSelectedItem());
    int numberOfGroupsToGenerate = allStudents.size() / groupSize;
    int studentWithoutGroup = allStudents.size() % groupSize;
    if (studentWithoutGroup != 0) groups.add(this.getListOfStudentsWithoutGroup(allStudents, groupSize));
    for(int i = 0; i < numberOfGroupsToGenerate; i++) {
        boolean isGroupCreated = false;
        while (!isGroupCreated){
            Collections.shuffle(allStudents);
            List<Student> newGroup = this.createNewRandomGroupOfStudents(allStudents, groupSize);
            groups.add(newGroup);
            if (!DataManager.isNewGroupDuplicated(newGroup.toString())) {
                isGroupCreated = true;
                allStudents.removeAll(newGroup);
            }
        }
    }
    DataManager.saveGroupsToCache(groups);
    return groups;
}

printGroups()

private void printGroups(List<List<Student>> lists) {
        System.out.println(lists);

    }

此语句responseFuture.cancel(true); 不会杀死 responseFuture 正在执行方法的线程。那么终止 CompletableFuture 线程最优雅的方法是什么?

【问题讨论】:

  • 你假设有一个线程要杀死。但这看起来就像一个异步操作链,可能根本没有线程在等待。你不想杀死一个线程,你想取消异步操作。
  • @DanielPryden 那么我怎样才能杀死这些操作呢?
  • 这取决于什么操作需要花费时间。你能显示createAllRandomGroupsprintGroups 的代码吗?
  • @DanielPryden 当然,完成。
  • 我是不是看错了什么?看起来您在这些方法中根本没有做任何异步工作。你为什么要使用 CompletableFuture?

标签: java multithreading java-8 future completable-future


【解决方案1】:

当您创建像b = a.thenApply(function) 这样的CompletableFuture 阶段链时,这种方便的方法会创建不同组件的设置。基本上,这些组件相互引用为a → function → b,因此a 的完成将触发function 的评估,这将首先预先检查b 是否仍未完成,然后评估您的功能并尝试用结果完成b

b 本身不知道function 或将评估它的线程。事实上,functionb 来说并不特殊,任何人都可以从任何线程调用completecompleteExceptionallycancel,第一个获胜。因此,类名中的 completable

掌握评估函数的线程的唯一方法是从一开始就控制它们,例如

ExecutorService myWorkers = Executors.newFixedThreadPool(2);

CompletableFuture<FinalResultType> future
    = CompletableFuture.supplyAsync(() -> generateInitialValue(), myWorkers)
                       .thenApplyAsync(v -> nextCalculation(v), myWorkers)
                       .thenApplyAsync(v -> lastCalculation(v), myWorkers);
future.whenComplete((x,y) -> myWorkers.shutdownNow());

现在,完成future,例如通过取消,将确保此链不会触发新的评估,并进一步尝试中断正在进行的评估(如果有)。

所以你可以实现一个超时,例如

try {
    try {
        FinalResultType result = future.get(2, TimeUnit.SECONDS);
        System.out.println("got "+result);
    }
    catch(TimeoutException ex) {
        if(future.cancel(true)) System.out.println("cancelled");
        else System.out.println("got "+future.get());
    }
}
catch(ExecutionException|InterruptedException ex) {
    ex.printStackTrace();
}

并不是说由于线程池的关闭而拒绝任务可能会导致一些中间未来永远无法完成,但是对于这一系列阶段来说,这是无关紧要的。重要的是,最后阶段 future 已完成,这是有保证的,因为它的完成会触发关闭。

【讨论】:

  • 但是超时后在未来调用的取消方法真的会杀死这个未来还是只是试图杀死它?
  • 对于CompletableFuture,取消只是完成的特殊形式,所以它只能有两种可能的结果之一,a)它成功取消或b)未来通过其他方式完成。无论哪种情况,当cancel 返回时,未来将完成。我的答案的解决方案将关闭执行程序,无论发生了什么完成。然后,保证不能安排新的任务。它将中断已经运行的评估,这需要他们一方的中断支持才能提前终止;没有办法。
  • @Holger 但 OP 询问:执行超过 2 秒我想终止此任务,虽然此代码确实会将未来标记为已完成,但它不会终止仍将继续进行的基础工作,对吗?
  • @Eugene 请注意whenComplete((x,y) -&gt; myWorkers.shutdownNow()),它将中断所有正在进行的任务。这是你能得到的最好的。
【解决方案2】:

终止线程的唯一方法是通过中断,这是一种协作机制。这意味着线程必须通过处理 InterruptedException 来实现中断逻辑。

但中断不属于你的线程是一种非常糟糕的做法,我认为这是你的情况。

【讨论】:

  • 那么我怎样才能杀死那些异步操作呢?