【问题标题】:Java ExecutorService - why does this program keep running?Java ExecutorService - 为什么这个程序继续运行?
【发布时间】:2015-08-30 22:47:50
【问题描述】:

我正在尝试构建类似于后台任务执行器的东西,如果没有答案,它会在一段时间后终止后台任务(后台任务调用 web 服务,它们可能会超时,但我需要确保它们在特定时间下超时时间)

所以我把它作为一个实验,但如果我运行它,程序不会终止。我想知道是不是因为后台线程仍然处于活动状态?我怎样才能关闭它?

public class Test {

public static class Task implements Callable<Object> {

    @Override
    public Object call() throws Exception {
        while(true) {}
    }

}

public static void main(String[] args) {
    try {
        Task t = new Task();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.invokeAll(Arrays.asList(t), 5L, TimeUnit.SECONDS);
        executor.shutdown();
        System.out.println("DONE");
    } 
    catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}

【问题讨论】:

  • 上述代码中的Task在哪里关闭?没有我能看到的地方。

标签: java multithreading executorservice


【解决方案1】:

ExecutorService 不会杀死正在运行的线程,并且由于线程是作为非守护进程创建的,因此 JVM 不会退出。

发生的情况是,当超时到期时,invokeAll() 返回的期货被取消,这意味着在未来对象上设置了一个标志,如果你尝试调用你会得到一个CancellationException future.get()。然而,invokeAll() 和 shutdown()(或 shutdownNow())都没有做任何事情来杀死线程。

请注意,您甚至不能自己杀死线程。您所能做的就是设置一些特定于应用程序的标志或调用Thread.interrupt(),但即使这样也不能保证the thread terminates

【讨论】:

  • 有没有办法可以杀死线程呢?我的意思是在线程本身之外。
  • 我添加了几段。基本上,没有。您可以设置一个标志或调用 thread.interrupt(),但如果您的线程在上述情况下没有被阻塞(例如在您的示例中),基本上似乎您无法销毁它
  • @breakline “杀死”线程的方法是使其可中断:编写run 方法,使其通过优雅退出来响应中断。在您的示例中,将 while (true) {} 替换为 Thread.sleep(Long.MAX_VALUE) 就足够了。
【解决方案2】:

Winterbe 有一篇关于执行者如何工作的精彩帖子。这是他教程的摘录

所以基本上 executor 总是一直在监听新任务或 callables/runnables,关闭 executor 或停止 executor 监听的一种方法是中断它正在执行的任何任务。一种方法是调用 future.get() ,它在主线程时停止,暂停它并确保当前线程在将资源移交给其他线程之前完全执行

您可能拥有更多的线程并编写代码以在 InterruptedException 块中正常关闭

这是我编写和测试过的示例代码:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorTest {

    public static void main(String[] args) {

        ExecutorService service = Executors.newWorkStealingPool(10);

        Callable<AccountClass> newInstance = () -> {
            TimeUnit.SECONDS.sleep(3);
            return getAcc(Thread.currentThread().getId());
        };

        // for now only one instance is added to the list
        // List<Callable<AccountClass>> callablesSingleList = Arrays.asList(newInstance);

        // adding multipleCallalbes
        List<Callable<AccountClass>> callablesMultipleList = Arrays.asList(
                () -> {
                    TimeUnit.SECONDS.sleep(3);
                    return getAcc(Thread.currentThread().getId());
                },
                () -> {
                    TimeUnit.SECONDS.sleep(3);
                    return getAcc(Thread.currentThread().getId());
                },
                () -> {
                    TimeUnit.SECONDS.sleep(3);
                    return getAcc(Thread.currentThread().getId());
                });

        try {
            service.invokeAll(callablesMultipleList).stream().map(future -> {
                AccountClass fuClass = null;
                try {
                    fuClass = future.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                return fuClass;
            }).forEach(getValue -> {
                System.out.println("retunred value:" + getValue);
            });
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }

    }

    private static AccountClass getAcc(long itr) {
        // probably call DB for every new thread iterator
        System.out.println("getting the current thread" + itr);
        AccountClass ac = new AccountClass();
        ac.setId(itr);
        ac.setName("vv");
        ac.setRole("admin");
        System.out.println("sending the accnt class:" + ac);
        return ac;
    }
}

更新:

关闭执行器的另一种方法是使用 service.shutDownNow() -> 关闭程序,即使它正在执行。您可以使用 awaitTermination 方法来指定您是否觉得可能需要几分钟才能完成执行,然后可能会关闭服务

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorScheduleFixedRate {

    public static void main(String[] args) {

        ScheduledExecutorService service = Executors.newScheduledThreadPool(10);

        Runnable task = () -> {
            getAcc(33);
        };

        service.scheduleWithFixedDelay(task, 10, 5, TimeUnit.SECONDS);

        if (!service.isShutdown()) {
            List<Runnable> list2 = service.shutdownNow();
            System.out.println(list2);
            System.out.println("is shutdonw" + service.isShutdown());
            System.out.println("Do something after the thread execution");
        }

    }

    private static AccountClass getAcc(long itr) {
        // probably call DB for every new thread iterator
        System.out.println("getting the current thread" + itr);
        AccountClass ac = new AccountClass();
        ac.setId(itr);
        ac.setName("vv");
        ac.setRole("admin");
        System.out.println("sending the accnt class:" + ac);
        return ac;
    }

}

【讨论】:

  • 记住最重要的是中断执行流程,这可以通过使用future,get()来实现,以防线程任务实现可调用接口。
猜你喜欢
  • 2015-07-11
  • 1970-01-01
  • 2012-11-11
  • 2013-02-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多