【问题标题】:How to cancel a task if new instance of task is started?如果启动了新的任务实例,如何取消任务?
【发布时间】:2018-12-21 02:24:21
【问题描述】:

我的应用程序包含一个ListView,每次选择一个项目时都会启动一个后台任务。然后,后台任务在成功完成后更新 UI 上的信息。

但是,当用户快速单击一个又一个项目时,所有这些任务都会继续,最后完成的任务会“获胜”并更新 UI,而不管最后选择的是哪个项目。

我需要以某种方式确保此任务在任何给定时间仅运行一个实例,因此在启动新任务之前取消所有先前的任务。

这是一个演示该问题的 MCVE:

import javafx.application.Application;
import javafx.concurrent.Task;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.Scene;
import javafx.scene.control.Label;
import javafx.scene.control.ListView;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;

public class taskRace  extends Application {

    private final ListView<String> listView = new ListView<>();
    private final Label label = new Label("Nothing selected");
    private String labelValue;

    public static void main(String[] args) {

        launch(args);
    }

    @Override
    public void start(Stage stage) throws Exception {

        // Simple UI
        VBox root = new VBox(5);
        root.setAlignment(Pos.CENTER);
        root.setPadding(new Insets(10));
        root.getChildren().addAll(listView, label);

        // Populate the ListView
        listView.getItems().addAll(
                "One", "Two", "Three", "Four", "Five"
        );

        // Add listener to the ListView to start the task whenever an item is selected
        listView.getSelectionModel().selectedItemProperty().addListener((observableValue, oldValue, newValue) -> {

            if (newValue != null) {

                // Create the background task
                Task task = new Task() {
                    @Override
                    protected Object call() throws Exception {

                        String selectedItem = listView.getSelectionModel().getSelectedItem();

                        // Do long-running task (takes random time)
                        long waitTime = (long)(Math.random() * 15000);
                        System.out.println("Waiting " + waitTime);
                        Thread.sleep(waitTime);
                        labelValue = "You have selected item: " + selectedItem ;
                        return null;
                    }
                };

                // Update the label when the task is completed
                task.setOnSucceeded(event ->{
                    label.setText(labelValue);
                });

                new Thread(task).start();
            }

        });

        stage.setScene(new Scene(root));
        stage.show();

    }
}

按随机顺序单击多个项目时,结果是不可预测的。我需要更新标签以显示上次执行的Task 的结果。

我是否需要以某种方式安排任务或将它们添加到服务中才能取消所有先前的任务?

编辑:

在我的真实应用程序中,用户从ListView 中选择一个项目,后台任务读取一个数据库(一个复杂的SELECT 语句)以获取与该项目相关的所有信息。然后这些详细信息会显示在应用程序中。

发生的问题是当用户选择一个项目但更改了他们的选择时,应用程序中显示的返回数据可能是第一个选择的项目,即使现在选择了一个完全不同的项目。

可以完全丢弃从第一个(即:不需要的)选择返回的任何数据。

【问题讨论】:

  • 我能知道这个任务的真正用途吗?我的意思是该任务的业务规则是什么?然后当你说取消时,它的真正含义是什么?假设一个任务正在执行 REST 调用或执行存储过程……我们需要知道真正取消的任务是什么。
  • @NghiaDo - 查看我的编辑。谢谢。
  • 那么,最终用户是否应该等到任务完成后再允许用户选择新项目?
  • 不,我希望他们能够继续前进并随时选择新项目。
  • 一个任务可能需要的最长时间是多少?

标签: java java.util.concurrent


【解决方案1】:

正如this answer 所提到的,您的要求似乎是使用Service 的完美理由。 Service 允许您在任何给定时间以可重复使用的1 方式运行一个 Task。当您通过Service.cancel() 取消Service 时,它会取消底层TaskService 还为您跟踪自己的Task ,因此您无需将它们保存在某个列表中。

使用您的 MVCE,您想要做的是创建一个 Service 来包装您的 Task。每次用户在ListView 中选择一个新项目时,您都会取消Service,更新必要的状态,然后重新启动Service。然后您将使用Service.setOnSucceeded 回调将结果设置为Label。这保证了只有最后一次成功的执行才会返回给您。即使之前取消的Tasks 仍然返回结果,Service 也会忽略它们。

您也不必担心外部同步(至少在您的 MVCE 中)。所有处理启动、取消和观察Service 的操作都发生在FX 线程上。在 FX 线程上执行的唯一代码(如下所示)not 将在 Task.call() 内(好吧,当类被实例化时立即分配的字段,我相信这发生在 JavaFX-Launcher线程)。

这是使用 Service 的 MVCE 的修改版本:

import javafx.application.Application;
import javafx.concurrent.Service;
import javafx.concurrent.Task;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.Scene;
import javafx.scene.control.Label;
import javafx.scene.control.ListView;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;

public class Main extends Application {

    private final ListView<String> listView = new ListView<>();
    private final Label label = new Label("Nothing selected");

    private final QueryService service = new QueryService();

    public static void main(String[] args) {
        launch(args);
    }

    @Override
    public void start(Stage stage) throws Exception {
        service.setOnSucceeded(wse -> {
            label.setText(service.getValue());
            service.reset();
        });
        service.setOnFailed(wse -> {
            // you could also show an Alert to the user here
            service.getException().printStackTrace();
            service.reset();
        });

        // Simple UI
        VBox root = new VBox(5);
        root.setAlignment(Pos.CENTER);
        root.setPadding(new Insets(10));
        root.getChildren().addAll(listView, label);

        // Populate the ListView
        listView.getItems().addAll(
                "One", "Two", "Three", "Four", "Five"
        );

        listView.getSelectionModel().selectedItemProperty().addListener((observableValue, oldValue, newValue) -> {
            if (service.isRunning()) {
                service.cancel();
                service.reset();
            }
            service.setSelected(newValue);
            service.start();
        });

        stage.setScene(new Scene(root));
        stage.show();

    }

    private static class QueryService extends Service<String> {

        // Field representing a JavaFX property
        private String selected;

        private void setSelected(String selected) {
            this.selected = selected;
        }

        @Override
        protected Task<String> createTask() {
            return new Task<>() {

                // Task state should be immutable/encapsulated
                private final String selectedCopy = selected;

                @Override
                protected String call() throws Exception {
                    try {
                        long waitTime = (long) (Math.random() * 15_000);
                        System.out.println("Waiting " + waitTime);
                        Thread.sleep(waitTime);
                        return "You have selected item: " + selectedCopy;
                    } catch (InterruptedException ex) {
                        System.out.println("Task interrupted!");
                        throw ex;
                    }
                }

            };
        }

        @Override
        protected void succeeded() {
            System.out.println("Service succeeded.");
        }

        @Override
        protected void cancelled() {
            System.out.println("Service cancelled.");
        }

    }
}

当您调用Service.start() 时,它会创建一个Task 并使用其executor property 中包含的当前Executor 执行它。如果属性包含null,那么它使用一些未指定的默认Executor(使用守护线程)。

在上面,你看到我在取消和onSucceededonFailed 回调后调用reset()。这是因为 Service 只能在 READY state 中启动。如果需要,您可以使用 restart() 而不是 start()。基本相当于调用cancel()->reset()->start()

1Task 不会变得可重用。相反,Service 每次启动时都会创建一个新的Task


当您取消Service 时,它会取消当前正在运行的Task,如果有的话。即使ServiceTask 已被取消并不意味着执行实际上已停止。在 Java 中,取消后台任务需要与该任务的开发者合作。

这种合作的形式是定期检查执行是否应该停止。如果使用普通的RunnableCallable,则需要检查当前Thread 的中断状态或使用一些boolean 标志2。由于Task 扩展了FutureTask,您还可以使用从Future 接口继承的isCancelled() 方法。如果由于某种原因(称为外部代码,未使用 Task 等)您不能使用 isCancelled(),那么您可以使用以下方法检查线程中断:

你可以通过Thread.currentThread()获得对当前线程的引用

在您的后台代码中,如果当前线程已被中断、boolean 标志已设置或Task 已被取消,您需要在适当的位置检查。如果有,那么您将执行任何必要的清理并停止执行(通过返回或抛出异常)。

另外,如果你的Thread 正在等待一些可中断的操作,比如阻塞 IO,那么它会在被中断时抛出一个InterruptedException。在您的 MVCE 中,您使用可中断的 Thread.sleep;这意味着当您调用取消时,此方法将抛出上述异常。

当我在上面说“清理”时,我的意思是任何必要的清理在后台,因为你仍然在后台线程上。如果您需要清理 FX 线程上的任何内容(例如更新 UI),则可以使用 ServiceonCancelled 属性。

在上面的代码中,您还会看到我使用了受保护的方法 succeeded()cancelled()TaskService 都提供这些方法(以及各种 Worker.States 的其他方法),它们将始终在 FX 线程上调用。但是请阅读文档,因为ScheduledService 要求您为其中一些方法调用超级实现。

2如果使用boolean 标志,请确保对其的更新对其他线程可见。您可以通过将其设为volatile、同步它或使用java.util.concurrent.atomic.AtomicBoolean 来做到这一点。

【讨论】:

  • 谢谢。这实际上工作得很好!适应自定义对象确实需要一些工作(我创建了一个新对象来保存所需的各个数据对象),但我喜欢它。其他后台任务还在执行,但是由于只有2个数据库查询,我不能取消第一个,第二个无论如何都超级快)。
  • @Zephyr 很高兴能帮上忙。当您弄清楚时,我实际上才刚刚开始回答您的问题,但是无论如何我都会重点说明我要说的内容。 (1) 记住 JavaFX 的黄金法则:永远不要从 JavaFX 应用程序线程以外的线程直接或间接更新 UI,(2) 最好有输入和输出Service 的对象是不可变的;但如果它们不能,那么只需确保正确防范多线程(维护 happens-before 关系),(3)Task 应该收到输入的副本,所以状态.. .(续)
  • ... 在执行 Task 时不会更改(当输入不可变时,只需复制引用),并且 (4) 我建议将 UI 代码与 Service 分开,除非它已经是控制器的内部类。明显的例外是您在 setOnXXX 回调中所做的事情。
  • 我实际上是让任务创建一个新对象并返回它。输入只是告诉任务要在数据库上运行什么查询,并使用该数据更新新对象。
  • @Zephyr 我相信那些protected 方法还有更多,所以你可以在Service 中进行某些清理内部。例如,ScheduledService 使用其中一些来更新所需的任何属性,然后重新安排自身。此外,您可以使用failed() 方法来始终记录故障,而不是将该行为编码到setOnFailed。但不,我不认为它们对你的情况是必要的。我主要将它们放在我的示例中,以简单地显示它们的存在。
【解决方案2】:

如果由于某种原因,您无法使用基于取消的解决方案(参见my answerSlaw's answer,实际上是所有其他答案)使其在您的实际用例中工作,那么您还有一件事可以试试。

这里提出的解决方案在理论上比任何基于取消的解决方案都更糟糕(因为所有冗余计算将一直持续到完成,从而浪费资源)。但是,实际上,一个有效的解决方案会比一个无效的解决方案更好(前提是它对您有效,并且它是唯一一个有效的解决方案 - 所以请先检查Slaw's solution)。


我建议您完全放弃取消,而是使用简单的volatile 字段来存储最后选择的项目的值:

private volatile String lastSelectedItem = null;

选择新标签后,您将更新此字段:

lastSelectedItem = task.selectedItem;

然后,在 on-succeeded 事件中,您只需检查是否允许分配计算结果:

if (task.selectedItem.equals(lastSelectedItem))

在下面找到整个修改后的 MCVE:

public class TaskRaceWithoutCancelation extends Application {

    private final ListView<String> listView = new ListView<>();
    private final Label label = new Label("Nothing selected");

    private final long startMillis = System.currentTimeMillis();

    private volatile String lastSelectedItem = null;

    public static void main(String[] args) {
        launch(args);
    }

    @Override
    public void start(Stage stage) {

        // Simple UI
        VBox root = new VBox(5);
        root.setAlignment(Pos.CENTER);
        root.setPadding(new Insets(10));
        root.getChildren().addAll(listView, label);

        // Populate the ListView
        listView.getItems().addAll(
                "One", "Two", "Three", "Four", "Five"
        );


        // Add listener to the ListView to start the task whenever an item is selected
        listView.getSelectionModel().selectedItemProperty().addListener((observableValue, oldValue, newValue) -> {

            if (newValue != null) {
                // Create the background task
                MyTask task = new MyTask();
                lastSelectedItem = task.selectedItem;

                // Update the label when the task is completed
                task.setOnSucceeded(event -> {
                    if (task.selectedItem.equals(lastSelectedItem)) {
                        label.setText(task.getValue());
                        println("Assigned " + task.selectedItem);
                    }
                });

                new Thread(task).start();
            }

        });

        stage.setScene(new Scene(root));
        stage.show();

    }

    private void println(String string) {
        System.out.format("%5.2fs: %s%n", 0.001 * (System.currentTimeMillis() - startMillis), string);
    }

    private class MyTask extends Task<String> {

        final String selectedItem = listView.getSelectionModel().getSelectedItem();

        @Override
        protected String call() {
            int ms = new Random().nextInt(10000);
            println(String.format("Will return %s in %.2fs", selectedItem, 0.001 * ms));

            // Do long-running task (takes random time)
            long limitMillis = System.currentTimeMillis() + ms;
            while (System.currentTimeMillis() < limitMillis) {
            }

            println("Returned " + selectedItem);
            return "You have selected item: " + selectedItem;
        }
    }
}

【讨论】:

    【解决方案3】:

    满足你需要的最简单的代码是这样的:

    class SingleTaskRunner {
    
        private Task<?> lastTask = null; 
    
        void runTask(Task<?> task) {
            registerTask(task);
            new Thread(task).start();
        }
    
        private synchronized void registerTask(Task<?> task) {
            if (lastTask != null) {
                lastTask.cancel(true);
            }
            lastTask = task;
        }
    }
    

    请注意,它与Nghia Do's answer 非常相似,只是有同步并且没有所有这些额外的演示代码。

    您评论说您对上述答案如何让您取消所有以前的任务感到困惑,但事实是 - 当您正确同步时(如在 registerTask 中) - 您只需要存储最后一个任务,因为所有以前的任务将已经被取消(通过相同的registerTask 方法,由于同步,该方法永远不会同时执行)。据我所知,在已经完成的Task 上调用cancel 是无操作的。

    此外,在new Thread(task).start() 上使用ExecutorService(就像Nghia Do 建议的那样)也是一个好主意,因为Thread creation is expensive


    编辑:抱歉,我之前没有尝试过您的 MCVE。现在我有,而且我无法重现该行为,我注意到您的 MCVE 中存在以下缺陷:

    1. 您正在使用Thread.sleep() 来模拟计算,但是在Thread.sleep() 内部调用Future.cancel() 会立即终止任务,因为Future.cancel() calls Thread.interrupt() 在内部,并且Thread.sleep() 检查中断标志,您很可能不要在你的真实代码中做。

    2. 您正在将计算结果分配给私有字段。由于(如上所述)计算在您的原始 MCVE 中立即中断,并且可能在您的实际代码中没有被中断,因此在您的实际代码中该字段的值可能会被已取消的任务覆盖。

    3. 我相信,在您的真实代码中,您可能做的不仅仅是将结果关联到字段中。在原始的 MCVE 中,即使将 Thread.sleep() 替换为循环后,我也无法重现该行为。原因是当任务被取消时,onSucceeded 处理程序确实被调用(这很好)。不过,我相信,在您的真实代码中,您可能在Task.call() 中做了比您在 MCVE 中显示的更重要的事情(例如一些 GUI 更新),因为否则,据我所知,您不会遇到了原来的问题。

    这是对您的 MCVE 的修改,它应该出现原始问题。

    编辑:我进一步修改了 MCVE 以打印取消信息并打印事件时间。但是,此代码不会重现 OP 所写的行为。

    public class TaskRace extends Application {
    
        private final ListView<String> listView = new ListView<>();
        private final Label label = new Label("Nothing selected");
        private final SingleTaskRunner runner = new SingleTaskRunner();
    
        private final long startMillis = System.currentTimeMillis();
    
        public static void main(String[] args) {
            launch(args);
        }
    
        @Override
        public void start(Stage stage) {
    
            // Simple UI
            VBox root = new VBox(5);
            root.setAlignment(Pos.CENTER);
            root.setPadding(new Insets(10));
            root.getChildren().addAll(listView, label);
    
            // Populate the ListView
            listView.getItems().addAll(
                    "One", "Two", "Three", "Four", "Five"
            );
    
    
            // Add listener to the ListView to start the task whenever an item is selected
            listView.getSelectionModel().selectedItemProperty().addListener((observableValue, oldValue, newValue) -> {
    
                if (newValue != null) {
                    // Create the background task
                    MyTask task = new MyTask();
    
                    // Update the label when the task is completed
                    task.setOnSucceeded(event -> {
                        label.setText(task.getValue());
                        println("Assigned " + task.selectedItem);
                    });
                    task.setOnCancelled(event -> println("Cancelled " + task.selectedItem));
    
                    runner.runTask(task);
                }
    
            });
    
            stage.setScene(new Scene(root));
            stage.show();
    
        }
    
        private void println(String string) {
            System.out.format("%5.2fs: %s%n", 0.001 * (System.currentTimeMillis() - startMillis), string);
        }
    
        private class MyTask extends Task<String> {
    
            final String selectedItem = listView.getSelectionModel().getSelectedItem();
    
            @Override
            protected String call() {
                int ms = new Random().nextInt(10000);
                println(String.format("Will return %s in %.2fs", selectedItem, 0.001 * ms));
    
                // Do long-running task (takes random time)
                long limitMillis = System.currentTimeMillis() + ms;
                while (System.currentTimeMillis() < limitMillis) {
                }
    
                println("Returned " + selectedItem);
                return "You have selected item: " + selectedItem;
            }
        }
    }
    

    【讨论】:

    • 这似乎没有任何区别。在上面的 MCVE 中,我已将 new Thread(task).start(); 行替换为 runner.runTask(task); 并且 runner 是您的 SingleTaskRunner 类的一个实例。结果和原来的问题一样。
    • 我又困惑了。除了使用循环而不是 Thread.sleep(),您还做了哪些更改以及为什么会这样?我无法更新我的实际程序来简单地使用循环......
    • @Zephyr 我会试着解释一下。第 1 点涉及 MCVE 中的 verifiability 问题(因此您绝对不必在真实代码中引入任何循环)。第 2 点和第 3 点涉及 MCVE 和您的真实代码 - 如果在您的真实代码中,您对 Task.call() 内的 Application 实例进行任何状态修改(在您的 MCVE 中,您可以通过变异labelValue),请尝试将它们移动到Task.setOnSucceeded,并尝试使用Task.getValue() 将计算结果from Task.call() to 你的@ 987654355@(而不是改变Applications 状态)。
    • 感谢您的尝试,但我仍有问题。事实上,在我的真实应用程序中,任务永远不会被取消,我不知道为什么。我看到 lastTask.cancel() 正在执行,但我的 onCancelled() 方法没有。 onSucceeded() 方法总是被执行。
    • 你可以查看lastTask.cancel()的返回值(这里调试器是你的朋友)。可能是false,表示任务没有被取消,因为它已经完成了。我不知道,也许您将计算委托给其他线程,并且任务立即完成?除非您提供更接近真实代码复杂性的 MCVE,否则很难提供帮助,因为它涉及太多猜测。 PS。我在我的回答中更新了修改后的 MCVE 以打印取消信息。您可能会尝试进一步调整它以使其更像您的真实代码。
    【解决方案4】:

    调用javafx.concurrent.Service.cancel() 将取消任何当前正在运行的Task,使其抛出InterruptedException

    取消任何当前正在运行的任务,如果有的话。状态将设置为CANCELLED
    ~ Service (JavaFX 8) - cancel ~

    如果需要额外的清理,可以提供Task.onCancelled() 事件处理程序。

    【讨论】:

    • 鉴于原始问题中的 MCVE,在哪里可以调用 cancel() 方法?是否需要将每个任务添加到列表中,然后进行迭代,取消所有任务?
    • 如果进程是IO绑定的,它只会抛出中断异常。无法保证在现实世界中工作任务处于睡眠状态。
    【解决方案5】:

    演示类

    package com.changehealthcare.pid;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class ThreadFun {
        public static ExecutorService threadExecutor = Executors.newFixedThreadPool(1);
        public static ServiceThread serviceThreads = new ServiceThread();
        public static Future futureService;
    
        public static void main(String[] args) throws Exception {
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            String line = "";
    
               while (line.equalsIgnoreCase("quit") == false) {
                   line = in.readLine();
                   //do something
                   if (futureService != null) {
                       System.out.println("try to cancel");
                       futureService.cancel(true);
                   }
                   futureService = threadExecutor.submit(serviceThreads);
               }
    
               in.close();
    
        }
    }
    

    ServiceThread 类

    package com.changehealthcare.pid;
    
    public class ServiceThread implements Runnable {
    
        @Override
        public void run() {
            System.out.println("starting the service");
    
            boolean isResult = this.doStuff();
    
            if (isResult) {
                this.displayStuff();
            }
            System.out.println("done");
        }
    
        private boolean doStuff() {
            try {
                for (int i=0; i<=10; i++) {
                    System.out.println(i);
                    Thread.sleep(1000);
                }
    
                return true;
            }
            catch (InterruptedException e) {
                System.out.println("cancelled");
                return false;
            }
        }
    
        private void displayStuff() {
            System.out.println("display done");
        }
    }
    

    解释 演示应用程序充当 System.in(键盘)的侦听器。如果按下任何字符(使用 Enter),它将取消任何当前线程(如果存在)并提交一个新线程。

    服务线程演示有一个 run() 方法,它调用 doStuff() 和 displayStuff()。您可以为 displayStuff 注入 UI 部分。在 doStuff() 中,它正在捕获 InterupptedException,如果发生该异常,我们不执行 displayStuff()。

    请检查它是否适合您的情况。我们可以锻炼更多。

    【讨论】:

    • 在这种情况下,OP 希望最终将线程的结果返回到父/绘图线程。提交Callable&lt;String&gt; 而不是Runnable 会更相关。这样你最终会得到 Future&lt;String&gt; 而不是 Future&lt;?&gt;
    • 不过,我很困惑。这在哪里可以让我在启动新任务时取消所有以前的任务?
    • 前一个任务总是最多的
    猜你喜欢
    • 2017-10-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-24
    • 2017-10-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多