【发布时间】:2017-12-16 06:39:02
【问题描述】:
我想要一个调度线程,它可以从工作线程池中执行和检索结果。调度程序需要不断地向工作线程提供工作。当任何工作线程完成时,调度程序需要收集其结果并重新调度(或创建新的)工作线程。在我看来,这应该是显而易见的,但我一直无法找到合适模式的示例。 Thread.join() 循环是不够的,因为那是真正的“与”逻辑,我正在寻找“或”逻辑。
我能想到的最好的办法是让调度程序线程wait() 并在完成后让工作线程notify()。虽然似乎我必须防止两个同时结束的工作线程导致调度程序线程错过notify()。另外,这对我来说似乎有点不雅。
更不优雅的是调度线程定期唤醒并轮询工作线程池并检查每个线程以查看它是否已通过isAlive() 完成。
我查看了java.util.concurrent 并没有看到任何看起来符合这种模式的东西。
我觉得要实现我上面提到的内容将涉及大量防御性编程和重新发明轮子。一定有一些我想念的东西。我可以利用什么来实现这种模式?
这是单线程版本。 putMissingToS3() 将成为调度程序线程,uploadFileToBucket() 中表示的功能将成为工作线程。
private void putMissingToS3()
{
int reqFilesToUpload = 0;
long reqSizeToUpload = 0L;
int totFilesUploaded = 0;
long totSizeUploaded = 0L;
int totFilesSkipped = 0;
long totSizeSkipped = 0L;
int rptLastFilesUploaded = 0;
long rptSizeInterval = 1000000000L;
long rptLastSize = 0L;
StopWatch rptTimer = new StopWatch();
long rptLastMs = 0L;
StopWatch globalTimer = new StopWatch();
StopWatch indvTimer = new StopWatch();
for (FileSystemRecord fsRec : fileSystemState.toList())
{
String reqKey = PathConverter.pathToKey(PathConverter.makeRelativePath(fileSystemState.getRootPath(), fsRec.getFullpath()));
LocalS3MetadataRecord s3Rec = s3Metadata.getRecord(reqKey);
// Just get a rough estimate of what the size of this upload will be
if (s3Rec == null)
{
++reqFilesToUpload;
reqSizeToUpload += fsRec.getSize();
}
}
long uploadTimeGuessMs = (long)((double)reqSizeToUpload/estUploadRateBPS*1000.0);
printAndLog("Estimated upload: " + natFmt.format(reqFilesToUpload) + " files, " + Utils.readableFileSize(reqSizeToUpload) +
", Estimated time " + Utils.readableElapsedTime(uploadTimeGuessMs));
globalTimer.start();
rptTimer.start();
for (FileSystemRecord fsRec : fileSystemState.toList())
{
String reqKey = PathConverter.pathToKey(PathConverter.makeRelativePath(fileSystemState.getRootPath(), fsRec.getFullpath()));
if (PathConverter.validate(reqKey))
{
LocalS3MetadataRecord s3Rec = s3Metadata.getRecord(reqKey);
//TODO compare and deal with size mismatches. Maybe go and look at last-mod dates.
if (s3Rec == null)
{
indvTimer.start();
uploadFileToBucket(s3, syncParms.getS3Bucket(), fsRec.getFullpath(), reqKey);
indvTimer.stop();
++totFilesUploaded;
totSizeUploaded += fsRec.getSize();
logOnly("Uploaded: Size=" + fsRec.getSize() + ", " + indvTimer.stopDeltaMs() + " ms, File=" + fsRec.getFullpath() + ", toKey=" + reqKey);
if (totSizeUploaded > rptLastSize + rptSizeInterval)
{
long invSizeUploaded = totSizeUploaded - rptLastSize;
long nowMs = rptTimer.intervalMs();
long invElapMs = nowMs - rptLastMs;
long remSize = reqSizeToUpload - totSizeUploaded;
double progessPct = (double)totSizeUploaded/reqSizeToUpload*100.0;
double mbps = (invElapMs > 0) ? invSizeUploaded/1e6/(invElapMs/1000.0) : 0.0;
long remMs = (long)((double)remSize/((double)invSizeUploaded/invElapMs));
printOnly("Progress: " + d2Fmt.format(progessPct) + "%, " + Utils.readableFileSize(totSizeUploaded) + " of " +
Utils.readableFileSize(reqSizeToUpload) + ", Rate " + d3Fmt.format(mbps) + " MB/s, " +
"Time rem " + Utils.readableElapsedTime(remMs));
rptLastMs = nowMs;
rptLastFilesUploaded = totFilesUploaded;
rptLastSize = totSizeUploaded;
}
}
}
else
{
++totFilesSkipped;
totSizeSkipped += fsRec.getSize();
logOnly("Skipped (Invalid chars): Size=" + fsRec.getSize() + ", " + fsRec.getFullpath() + ", toKey=" + reqKey);
}
}
globalTimer.stop();
double mbps = 0.0;
if (globalTimer.stopDeltaMs() > 0)
mbps = totSizeUploaded/1e6/(globalTimer.stopDeltaMs()/1000.0);
printAndLog("Actual upload: " + natFmt.format(totFilesUploaded) + " files, " + Utils.readableFileSize(totSizeUploaded) +
", Time " + Utils.readableElapsedTime(globalTimer.stopDeltaMs()) + ", Rate " + d3Fmt.format(mbps) + " MB/s");
if (totFilesSkipped > 0)
printAndLog("Skipped Files: " + natFmt.format(totFilesSkipped) + " files, " + Utils.readableFileSize(totSizeSkipped));
}
private void uploadFileToBucket(AmazonS3 amazonS3, String bucketName, String filePath, String fileKey)
{
File inFile = new File(filePath);
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.addUserMetadata(Const.LAST_MOD_KEY, Long.toString(inFile.lastModified()));
objectMetadata.setLastModified(new Date(inFile.lastModified()));
PutObjectRequest por = new PutObjectRequest(bucketName, fileKey, inFile).withMetadata(objectMetadata);
// Amazon S3 never stores partial objects; if during this call an exception wasn't thrown, the entire object was stored.
amazonS3.putObject(por);
}
【问题讨论】:
-
“收集结果”是什么意思?调度员需要做什么工作线程内无法完成的事情?
-
@shmosel 聚合所有线程的进度并智能调度新工作,其中可能包括减少或增加工作线程的数量。
-
这不是意味着等待所有线程完成吗?
-
@shmosel 否。最终所有线程都会完成,但这项工作可能会运行数周。所以每个单独的线程只会知道它在更大工作负载中的一小部分。在此期间,可能需要调整执行线程的数量以利用可用的网络带宽或对其进行限制。此外,包括重新启动失败线程在内的整体工作负载管理只能由调度程序完成。所以还有更多工作要做,只是等待线程完成。
标签: java multithreading