【问题标题】:In a Java 7+ ForkJoinPool, is it possible to cancel a task and all subtasks?在 Java 7+ ForkJoinPool 中,是否可以取消任务和所有子任务?
【发布时间】:2014-05-26 11:26:51
【问题描述】:

我的程序通过分而治之的方法搜索问题的解决方案(任何解决方案),使用递归和RecursiveTasks's 实现:我为部门的第一个分支分叉一个任务,然后递归到第二个分支:如果第二个分支找到了解决方案,那么我取消第一个分支,否则我等待它的结果。

这可能不是最优的。如果找到解决方案,一种方法是让任何已启动的任务抛出异常。但是,我将如何取消所有已启动的任务?取消一个任务是否也会取消所有子任务?

【问题讨论】:

标签: java java.util.concurrent fork-join


【解决方案1】:

您可以使用任务管理器的简单方法。例如:

public class TaskManager<T> {

private List<ForkJoinTask<T>> tasks;

public TaskManager() {
    tasks = new ArrayList<>();
}

public void addTask(ForkJoinTask<T> task) {
    tasks.add(task);
}

public void cancelAllExcludeTask(ForkJoinTask<Integer> cancelTask) {
    for (ForkJoinTask<T> task : tasks) {
        if (task != cancelTask) {
            task.cancel(true);
        }
    }
}

public void cancelTask(ForkJoinTask<Integer> cancelTask) {
    for (ForkJoinTask<T> task : tasks) {
        if (task == cancelTask) {
            task.cancel(true);
        }
    }
}

}

还有任务:

public class YourTask extends RecursiveTask<Integer> {

private TaskManager<Integer> taskManager;

@Override
protected Integer compute() {
        // stuff and fork
        newTask.fork();
        // do not forget to save in managers list
        taskManager.addTask(newTask);

        // another logic

        // if current task should be cancelled            
        taskManager.cancelTasks(this);

        // or if you have decided to cancel all other tasks
        taskManager.cancelAllExcludeTask(this);
}
}

【讨论】:

  • cancelAllExcludeTask() 的问题在于,当任务持有资源或具有其他一些依赖项时,可能存在内部问题。它归结为与 Thread.stop() 相同的问题。没有框架可以知道任何任务正在做什么,因此尝试停止任务总是很危险的。
  • 如果你应该关心任务分配的资源,仍然可以直接扩展 FutureTask 并覆盖它的 cancel() 方法。然后将此类任务提交给执行者。
  • 您确定RecursiveTask.cancel(true) 真的停止了任务吗?因为ForkJoinTask 中的cancel() 实现(RecursiveTask 继承自其中),只需设置一个cancelled 标志,但不会中断线程。另见stackoverflow.com/questions/21320156/…
  • cancel 调用 setCompletion(int completion) 可能会发出: synchronized (this) { notifyAll();这可能会对其他任务产生意想不到的后果,如上所述,它不会中断当前线程。取消任务及其所有子任务(原始问题)的唯一确定/安全方法是自己手动编写逻辑代码。
【解决方案2】:

框架无法取消任务,原因与您无法取消线程相同。出于所有原因,请参阅 Thread.stop() 上的文档。任务可以持有什么锁?它可以与哪些外部资源相关联?所有相同的 Thread.stop() 原因也适用于任务(毕竟,任务在线程下运行。)您需要告诉任务停止,就像告诉线程停止一样。

我管理另一个使用 scatter-gather 技术的 fork/join 项目。我进行取消或短路的方式是,我创建的每个任务都会传递一个对象 (PassObject),该对象具有

protected volatile boolean stop_now = false;

以及停止任务的方法

protected void stopNow() {stop_now = true; }

每个任务都会定期检查 stop_now,当它为 true 时,它​​会优雅地结束任务。

不幸的是,stop_now 需要是 volatile 的,因为另一个线程要设置它。如果您经常检查,这可能会增加大量开销。

如何在另一个任务中设置此字段有点棘手。我创建的每个任务还​​包含对其他所有任务的引用数组的引用

int nbr_tasks = nbr_elements / threshold;
// this holds the common class passed to each task
PassObject[] passList = new PassObject[nbr_tasks];
 for (int i = 0; i < nbr_tasks; i++) 
passList[i] = new PassObject( passList,… other parms);

一旦列表形成,我 fork() passList 中的每个对象。每个 PassObject 都包含对数组 passList 的引用,该数组包含对传递给每个任务的每个对象的引用。因此,每个任务都知道其他所有任务,当一个任务想要取消其他任务时,它只需调用 cancelOthers 方法并引用 passList。

private void cancelOthers (PassObject[] others) {
// tell all tasks to stop
 for (int i = 0, max = others.length; i < max; i++)
others[i].stopNow();

如果您使用的是 Java8,那么您可以使用 CountedCompler 类而不是 RecusiveTask 进行分散收集。对于 Java7 或者如果您仍想使用 RecursiveTask,那么递归中的第一个任务需要创建一个 AtomicBoolean 字段 (AtomicBoolean stop_now = new AtomicBoolean(false);) 并在它创建的每个新 RecursiveTask 中包含对该字段的引用。使用递归,你不知道一开始需要多少层任务。 同样,您需要在代码中定期检查布尔值是否为真,如果为真,则优雅地结束任务。

以上只是如何取消的提示。每个应用程序都是不同的。我所做的适用于我的应用程序——但逻辑是相同的。您需要一个任务可以设置并且每个其他任务都可以看到的每个任务中的共同点。

我会添加更多代码,但插入的代码一次只占用一行,不实用。

【讨论】:

  • 怎么样?种族无关紧要。只要唯一的修改是设置为false。
  • “我创建的每个任务还​​包含对所有其他任务的引用数组的引用。” - 如果每个任务只知道它的直接子任务(例如它自己创建的任务),这还不够吗?
  • 我使用分散收集技术,在开始时创建所有必要的任务。这不是递归分解技术。如果您使用递归来创建子任务,即创建子任务,那么您应该使用 AtomicBoolean 方法并将该引用传递给每个子任务。但是,仅仅取消一个子任务链是不够的。您需要取消所有任务。
  • InterruptedException 的便利之处在于,如果您调用第 3 方方法,它们会遵守 InterruptedException 并且 stop_now 只能由您的代码支持
  • 第三方方法做他们想做的事。无法保证任何第三方方法将遵守任何标准程序。这个帖子有 16 个月了,你参加聚会有点晚了。
猜你喜欢
  • 2016-01-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多