【问题标题】:How to Wait for Completion of ANY Worker Thread?如何等待任何工作线程的完成?
【发布时间】:2017-12-16 06:39:02
【问题描述】:

我想要一个调度线程,它可以从工作线程池中执行和检索结果。调度程序需要不断地向工作线程提供工作。当任何工作线程完成时,调度程序需要收集其结果并重新调度(或创建新的)工作线程。在我看来,这应该是显而易见的,但我一直无法找到合适模式的示例。 Thread.join() 循环是不够的,因为那是真正的“与”逻辑,我正在寻找“或”逻辑。

我能想到的最好的办法是让调度程序线程wait() 并在完成后让工作线程notify()。虽然似乎我必须防止两个同时结束的工作线程导致调度程序线程错过notify()。另外,这对我来说似乎有点不雅。

更不优雅的是调度线程定期唤醒并轮询工作线程池并检查每个线程以查看它是否已通过isAlive() 完成。

我查看了java.util.concurrent 并没有看到任何看起来符合这种模式的东西。

我觉得要实现我上面提到的内容将涉及大量防御性编程和重新发明轮子。一定有一些我想念的东西。我可以利用什么来实现这种模式?

这是单线程版本。 putMissingToS3() 将成为调度程序线程,uploadFileToBucket() 中表示的功能将成为工作线程。

private void putMissingToS3()
{
    int reqFilesToUpload = 0;
    long reqSizeToUpload = 0L;

    int totFilesUploaded = 0;
    long totSizeUploaded = 0L;
    int totFilesSkipped = 0;
    long totSizeSkipped = 0L;

    int rptLastFilesUploaded = 0;
    long rptSizeInterval = 1000000000L;
    long rptLastSize = 0L;
    StopWatch rptTimer = new StopWatch();
    long rptLastMs = 0L;


    StopWatch globalTimer = new StopWatch();
    StopWatch indvTimer = new StopWatch();

    for (FileSystemRecord fsRec : fileSystemState.toList())
    {
        String reqKey = PathConverter.pathToKey(PathConverter.makeRelativePath(fileSystemState.getRootPath(), fsRec.getFullpath()));

        LocalS3MetadataRecord s3Rec = s3Metadata.getRecord(reqKey);

        // Just get a rough estimate of what the size of this upload will be 
        if (s3Rec == null)
        {
            ++reqFilesToUpload;
            reqSizeToUpload += fsRec.getSize();
        }
    }

    long uploadTimeGuessMs = (long)((double)reqSizeToUpload/estUploadRateBPS*1000.0);

    printAndLog("Estimated upload: " + natFmt.format(reqFilesToUpload) + " files, " + Utils.readableFileSize(reqSizeToUpload) + 
            ", Estimated time " + Utils.readableElapsedTime(uploadTimeGuessMs));

    globalTimer.start();
    rptTimer.start();
    for (FileSystemRecord fsRec : fileSystemState.toList())
    {
        String reqKey = PathConverter.pathToKey(PathConverter.makeRelativePath(fileSystemState.getRootPath(), fsRec.getFullpath()));

        if (PathConverter.validate(reqKey))
        {
            LocalS3MetadataRecord s3Rec = s3Metadata.getRecord(reqKey);

            //TODO compare and deal with size mismatches.  Maybe go and look at last-mod dates.
            if (s3Rec == null)
            {
                indvTimer.start();
                uploadFileToBucket(s3, syncParms.getS3Bucket(), fsRec.getFullpath(), reqKey);
                indvTimer.stop();

                ++totFilesUploaded;
                totSizeUploaded += fsRec.getSize();

                logOnly("Uploaded: Size=" + fsRec.getSize() + ", " + indvTimer.stopDeltaMs() + " ms, File=" + fsRec.getFullpath() + ", toKey=" + reqKey);

                if (totSizeUploaded > rptLastSize + rptSizeInterval)
                {
                    long invSizeUploaded = totSizeUploaded - rptLastSize;

                    long nowMs = rptTimer.intervalMs();
                    long invElapMs = nowMs - rptLastMs;
                    long remSize = reqSizeToUpload - totSizeUploaded;
                    double progessPct = (double)totSizeUploaded/reqSizeToUpload*100.0;
                    double mbps = (invElapMs > 0) ? invSizeUploaded/1e6/(invElapMs/1000.0) : 0.0;
                    long remMs = (long)((double)remSize/((double)invSizeUploaded/invElapMs));

                    printOnly("Progress: " + d2Fmt.format(progessPct) + "%, " + Utils.readableFileSize(totSizeUploaded) + " of " + 
                            Utils.readableFileSize(reqSizeToUpload) + ", Rate " + d3Fmt.format(mbps) + " MB/s, " + 
                            "Time rem " + Utils.readableElapsedTime(remMs));

                    rptLastMs = nowMs;
                    rptLastFilesUploaded = totFilesUploaded;
                    rptLastSize = totSizeUploaded;
                }
            }
        }
        else
        {
            ++totFilesSkipped;
            totSizeSkipped += fsRec.getSize();

            logOnly("Skipped (Invalid chars): Size=" + fsRec.getSize() + ", " + fsRec.getFullpath() + ", toKey=" + reqKey);

        }

    }

    globalTimer.stop();

    double mbps = 0.0;

    if (globalTimer.stopDeltaMs() > 0)
        mbps = totSizeUploaded/1e6/(globalTimer.stopDeltaMs()/1000.0);

    printAndLog("Actual upload: " + natFmt.format(totFilesUploaded) + " files, " + Utils.readableFileSize(totSizeUploaded) + 
            ", Time " + Utils.readableElapsedTime(globalTimer.stopDeltaMs()) + ", Rate " + d3Fmt.format(mbps) + " MB/s");

    if (totFilesSkipped > 0)
        printAndLog("Skipped Files: " + natFmt.format(totFilesSkipped) + " files, " + Utils.readableFileSize(totSizeSkipped)); 
}

private void uploadFileToBucket(AmazonS3 amazonS3, String bucketName, String filePath, String fileKey)
{    
    File inFile = new File(filePath);

    ObjectMetadata objectMetadata = new ObjectMetadata();
    objectMetadata.addUserMetadata(Const.LAST_MOD_KEY, Long.toString(inFile.lastModified()));
    objectMetadata.setLastModified(new Date(inFile.lastModified()));

    PutObjectRequest por = new PutObjectRequest(bucketName, fileKey, inFile).withMetadata(objectMetadata);

    // Amazon S3 never stores partial objects; if during this call an exception wasn't thrown, the entire object was stored.  
    amazonS3.putObject(por);  
}

【问题讨论】:

  • “收集结果”是什么意思?调度员需要做什么工作线程内无法完成的事情?
  • @shmosel 聚合所有线程的进度并智能调度新工作,其中可能包括减少或增加工作线程的数量。
  • 这不是意味着等待所有线程完成吗?
  • @shmosel 否。最终所有线程都会完成,但这项工作可能会运行数周。所以每个单独的线程只会知道它在更大工作负载中的一小部分。在此期间,可能需要调整执行线程的数量以利用可用的网络带宽或对其进行限制。此外,包括重新启动失败线程在内的整体工作负载管理只能由调度程序完成。所以还有更多工作要做,只是等待线程完成。

标签: java multithreading


【解决方案1】:

我认为你的包装是正确的。你应该使用 ExecutorService API。 这消除了等待和观察线程通知的负担。 示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;


public class ExecutorEx{
    static class ThreadA implements Runnable{
        int id;
        public ThreadA(int id){
            this.id = id;
        }
        public void run(){
            //To simulate some work
            try{Thread.sleep(Math.round(Math.random()*100));}catch(Exception e){}
            // to show message
            System.out.println(this.id + "--Test Message" + System.currentTimeMillis());
        }
    }

    public static void main(String args[]) throws Exception{
        int poolSize = 10;
        ExecutorService pool = Executors.newFixedThreadPool(poolSize);
        int i=0;
        while(i<100){
            pool.submit(new ThreadA(i));
            i++;
        }
        pool.shutdown();
        while(!pool.isTerminated()){
            pool.awaitTermination(60, TimeUnit.SECONDS);
        }
    }
}

如果你想从你的线程返回一些东西,你需要实现 Callable 而不是 Runnable(call() 而不是 run()) 并在 Future 对象数组中收集返回的值,你可以稍后迭代。

【讨论】:

  • 您要提交主题的依据是什么?下一个提交是否依赖于上一个(这听起来是单线程方法)?这将有助于我们了解您是否可以至少共享一个调度程序的示例代码。
  • 我将一个串行进程分解为多个线程以显着减少时间。这是一项可能运行数天或数周的作业,因此我需要能够跟踪和报告进度,并根据吞吐率增加或减少活动线程的数量。因此,从每个线程的完成中获取结果将允许计算整体进度和传输率。
  • 只是一个想法:如何将所有未来对象传递给另一个观察者线程实例,该实例可以相应地改变调度程序(线程数等),同时观察者线程也可以将失败的对象放回输入队列。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2010-12-07
  • 2014-07-17
  • 1970-01-01
  • 2010-11-18
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多