【发布时间】:2010-10-16 16:32:04
【问题描述】:
我有一个对象,其方法名为 StartDownload(),它启动三个线程。
如何在每个线程完成执行时收到通知?
有没有办法知道一个(或全部)线程是否已完成或仍在执行?
【问题讨论】:
-
看看Java 5 Barrier class
标签: java multithreading
我有一个对象,其方法名为 StartDownload(),它启动三个线程。
如何在每个线程完成执行时收到通知?
有没有办法知道一个(或全部)线程是否已完成或仍在执行?
【问题讨论】:
标签: java multithreading
我想最简单的方法是使用ThreadPoolExecutor 类。
挂钩方法
此类提供受保护的可重写
beforeExecute(java.lang.Thread, java.lang.Runnable)和afterExecute(java.lang.Runnable, java.lang.Throwable)方法,这些方法在执行每个任务之前和之后调用。这些可用于操纵执行环境;例如,重新初始化 ThreadLocals、收集统计信息或添加日志条目。此外,可以覆盖方法terminated()以执行任何需要在 Executor 完全终止后完成的特殊处理。
这正是我们所需要的。我们将覆盖afterExecute() 以在每个线程完成后获取回调,并将覆盖terminated() 以了解所有线程何时完成。
这就是你应该做的事情
创建一个执行器:
private ThreadPoolExecutor executor;
private int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();
private void initExecutor() {
executor = new ThreadPoolExecutor(
NUMBER_OF_CORES * 2, //core pool size
NUMBER_OF_CORES * 2, //max pool size
60L, //keep aive time
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()
) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
//Yet another thread is finished:
informUiAboutProgress(executor.getCompletedTaskCount(), listOfUrisToProcess.size());
}
}
};
@Override
protected void terminated() {
super.terminated();
informUiThatWeAreDone();
}
}
然后开始你的线程:
private void startTheWork(){
for (Uri uri : listOfUrisToProcess) {
executor.execute(new Runnable() {
@Override
public void run() {
doSomeHeavyWork(uri);
}
});
}
executor.shutdown(); //call it when you won't add jobs anymore
}
内部方法informUiThatWeAreDone();在所有线程完成后做任何你需要做的事情,例如,更新UI。
注意:不要忘记使用synchronized 方法,因为您并行工作,如果您决定从另一个synchronized 方法调用synchronized 方法,请务必小心!这通常会导致死锁
希望这会有所帮助!
【讨论】:
使用CyclicBarrier的解决方案
public class Downloader {
private CyclicBarrier barrier;
private final static int NUMBER_OF_DOWNLOADING_THREADS;
private DownloadingThread extends Thread {
private final String url;
public DownloadingThread(String url) {
super();
this.url = url;
}
@Override
public void run() {
barrier.await(); // label1
download(url);
barrier.await(); // label2
}
}
public void startDownload() {
// plus one for the main thread of execution
barrier = new CyclicBarrier(NUMBER_OF_DOWNLOADING_THREADS + 1); // label0
for (int i = 0; i < NUMBER_OF_DOWNLOADING_THREADS; i++) {
new DownloadingThread("http://www.flickr.com/someUser/pic" + i + ".jpg").start();
}
barrier.await(); // label3
displayMessage("Please wait...");
barrier.await(); // label4
displayMessage("Finished");
}
}
label0 - 创建循环屏障,参与方的数量等于执行线程的数量加上主执行线程的数量(其中正在执行 startDownload())
标签1 - 第n个DownloadingThread进入等候室
标签 3 - NUMBER_OF_DOWNLOADING_THREADS 个已进入等候室。执行的主线程释放它们以或多或少同时开始执行下载工作
标签 4 - 执行的主线程进入等候室。这是要理解的代码中“最棘手”的部分。哪个线程第二次进入等候室并不重要。重要的是,无论哪个线程最后进入房间,都要确保所有其他下载线程都完成了他们的下载工作。
标签 2 - 第 n 个 DownloadingThread 已完成下载工作并进入等候室。如果是最后一个,即已经有 NUMBER_OF_DOWNLOADING_THREADS 个进入,包括执行的主线程,只有在其他所有线程都完成下载后,主线程才会继续执行。
【讨论】:
查看 Thread 类的 Java 文档。您可以检查线程的状态。如果把三个线程放在成员变量中,那么三个线程都可以互相读取状态。
不过,您必须小心一点,因为您可能会导致线程之间出现竞争条件。尽量避免基于其他线程状态的复杂逻辑。绝对避免多个线程写入相同的变量。
【讨论】:
这是一个简单、简短、易于理解且非常适合我的解决方案。当另一个线程结束时,我需要绘制到屏幕上;但不能,因为主线程可以控制屏幕。所以:
(1) 我创建了全局变量:boolean end1 = false; 线程在结束时将其设置为 true。这是由“postDelayed”循环在主线程中拾取的,并在其中进行响应。
(2) 我的帖子包含:
void myThread() {
end1 = false;
new CountDownTimer(((60000, 1000) { // milliseconds for onFinish, onTick
public void onFinish()
{
// do stuff here once at end of time.
end1 = true; // signal that the thread has ended.
}
public void onTick(long millisUntilFinished)
{
// do stuff here repeatedly.
}
}.start();
}
(3) 幸运的是,“postDelayed”在主线程中运行,因此每秒检查一次另一个线程。当另一个线程结束时,这可以开始我们接下来想做的任何事情。
Handler h1 = new Handler();
private void checkThread() {
h1.postDelayed(new Runnable() {
public void run() {
if (end1)
// resond to the second thread ending here.
else
h1.postDelayed(this, 1000);
}
}, 1000);
}
(4) 最后,通过调用在代码中某处开始运行整个事情:
void startThread()
{
myThread();
checkThread();
}
【讨论】:
我建议查看 Thread 类的 javadoc。
您有多种线程操作机制。
您的主线程可以连续join() 三个线程,然后在三个线程都完成之前不会继续。
每隔一段时间轮询派生线程的线程状态。
将所有生成的线程放入一个单独的ThreadGroup 并轮询ThreadGroup 上的activeCount() 并等待它变为0。
为线程间通信设置自定义回调或侦听器类型的接口。
我敢肯定还有很多其他方法我仍然想念。
【讨论】:
在过去 6 年中,多线程方面发生了很多变化。
你可以使用join()和lock API,而不是使用
1.ExecutorServiceinvokeAll()API
执行给定的任务,当所有任务完成时返回一个包含状态和结果的 Futures 列表。
一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
CountDownLatch使用给定的计数进行初始化。由于调用countDown()方法,await 方法一直阻塞,直到当前计数达到零,之后所有等待的线程都被释放,任何后续的 await 调用立即返回。这是一次性现象——计数无法重置。如果您需要重置计数的版本,请考虑使用 CyclicBarrier。
3.ForkJoinPool 或 newWorkStealingPool() in Executors 是其他方式
4.从ExecutorService 上的提交遍历所有Future 任务,并通过Future 对象上的阻塞调用get() 检查状态
查看相关的 SE 问题:
How to wait for a thread that spawns it's own thread?
Executors: How to synchronously wait until all tasks have finished if tasks are created recursively?
【讨论】:
您还可以使用Executors 对象来创建ExecutorService 线程池。然后使用invokeAll 方法运行每个线程并检索Futures。这将阻塞,直到所有都完成执行。您的另一种选择是使用池执行每一个,然后调用awaitTermination 进行阻塞,直到池完成执行。添加任务后请务必致电shutdown()。
【讨论】:
您应该真的更喜欢使用java.util.concurrent 的解决方案。查找并阅读有关该主题的 Josh Bloch 和/或 Brian Goetz。
如果您不使用java.util.concurrent.* 并且负责直接使用线程,那么您可能应该使用join() 来了解线程何时完成。这是一个超级简单的回调机制。先扩展Runnable接口,使其有回调:
public interface CallbackRunnable extends Runnable {
public void callback();
}
然后创建一个 Executor 来执行你的 runnable 并在完成后给你回电。
public class CallbackExecutor implements Executor {
@Override
public void execute(final Runnable r) {
final Thread runner = new Thread(r);
runner.start();
if ( r instanceof CallbackRunnable ) {
// create a thread to perform the callback
Thread callerbacker = new Thread(new Runnable() {
@Override
public void run() {
try {
// block until the running thread is done
runner.join();
((CallbackRunnable)r).callback();
}
catch ( InterruptedException e ) {
// someone doesn't want us running. ok, maybe we give up.
}
}
});
callerbacker.start();
}
}
}
添加到CallbackRunnable 接口的另一种显而易见的事情是处理任何异常的方法,因此可以在其中放置一个public void uncaughtException(Throwable e); 行并在您的执行程序中安装一个 Thread.UncaughtExceptionHandler 将您发送到那个接口方法。
但是做这一切真的开始闻起来像java.util.concurrent.Callable。如果您的项目允许,您应该考虑使用java.util.concurrent。
【讨论】:
runner.join(),然后再调用任何你想要的代码,因为你知道线程已经完成。是否只是您可以将该代码定义为可运行对象的属性,这样您就可以为不同的可运行对象提供不同的东西?
runner.join() 是最直接的等待方式。我假设 OP 不想阻止他们的主调用线程,因为他们要求每次下载都得到“通知”,这可以按任何顺序完成。这提供了一种异步获取通知的方法。
您还可以使用具有内置属性更改支持的 SwingWorker。请参阅 addPropertyChangeListener() 或 get() 方法以获取状态更改侦听器示例。
【讨论】:
您可以通过多种方式做到这一点:
如何实现想法#5?好吧,一种方法是先创建一个接口:
public interface ThreadCompleteListener {
void notifyOfThreadComplete(final Thread thread);
}
然后创建以下类:
public abstract class NotifyingThread extends Thread {
private final Set<ThreadCompleteListener> listeners
= new CopyOnWriteArraySet<ThreadCompleteListener>();
public final void addListener(final ThreadCompleteListener listener) {
listeners.add(listener);
}
public final void removeListener(final ThreadCompleteListener listener) {
listeners.remove(listener);
}
private final void notifyListeners() {
for (ThreadCompleteListener listener : listeners) {
listener.notifyOfThreadComplete(this);
}
}
@Override
public final void run() {
try {
doRun();
} finally {
notifyListeners();
}
}
public abstract void doRun();
}
然后你的每个线程将扩展NotifyingThread,而不是实现run(),它将实现doRun()。因此,当它们完成时,它们会自动通知任何等待通知的人。
最后,在您的主类中——启动所有线程(或至少是等待通知的对象)的主类——将该类修改为implement ThreadCompleteListener,并在创建每个线程后立即将自身添加到侦听器列表中:
NotifyingThread thread1 = new OneOfYourThreads();
thread1.addListener(this); // add ourselves as a listener
thread1.start(); // Start the Thread
然后,当每个 Thread 退出时,您的 notifyOfThreadComplete 方法将与刚刚完成(或崩溃)的 Thread 实例一起调用。
请注意,对于NotifyingThread,最好使用implements Runnable 而不是extends Thread,因为在新代码中通常不鼓励扩展线程。但我正在为你的问题编码。如果您将NotifyingThread 类更改为实现Runnable,那么您必须更改一些管理线程的代码,这很容易做到。
【讨论】:
notify 方法,而不是在 run 方法中,而是在它之后。
您可以使用 getState() 查询线程实例,它返回具有以下值之一的 Thread.State 枚举实例:
* NEW
A thread that has not yet started is in this state.
* RUNNABLE
A thread executing in the Java virtual machine is in this state.
* BLOCKED
A thread that is blocked waiting for a monitor lock is in this state.
* WAITING
A thread that is waiting indefinitely for another thread to perform a particular action is in this state.
* TIMED_WAITING
A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
* TERMINATED
A thread that has exited is in this state.
但是我认为拥有一个等待 3 个子进程完成的主线程会是一个更好的设计,当其他 3 个子进程完成时,主线程会继续执行。
【讨论】:
您要等待他们完成吗?如果是这样,请使用 Join 方法。
如果您只想检查它,还有 isAlive 属性。
【讨论】:
Thread 告诉start,线程会这样做,但调用会立即返回。 isAlive 应该是一个简单的标志测试,但是当我用谷歌搜索时,方法是 native。