【发布时间】:2014-07-07 19:38:04
【问题描述】:
我有一个用 Java 编写的文件处理程序。处理后的数据将被放入一个 LinkedBlockingQueue 供消费者使用。在调试时,我停止了所有消费者,只是为了让它变得简单。我注意到几乎每次阻塞队列的大小达到 3600ish 左右时,一切都变得越来越慢,然后几乎停止(程序没有退出,但不再有队列大小输出,也没有处理任何文件)没有任何内存不足异常。从资源监视器中,我看到内存水平保持非常稳定。
有人知道可能的原因是什么吗?
//下面是代码: 在主要:
_taskQueue = new BlockingTaskDQueue<StreamQueueItem>();
_fileProcessor = new LogFileProcessor(_taskQueue);
(new Thread(_fileProcessor,LogFileProcessor.myName)).start();
日志文件处理器:
public class LogFileProcessor implements Runnable {
public static final String myName = ConfigConstants.THREAD_NAME_PREFIX + "FileProcessor";
private BlockingTaskDQueue<StreamQueueItem> mTaskQueue;
private ExecutorService mThreadPool;
//indicate whether the thread pool has been shut down
private boolean mIsEnded = false;
public LogFileProcessor(BlockingTaskDQueue<StreamQueueItem> _taskQueue) {
super();
this.mTaskQueue = _taskQueue;
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(ConfigConstants.THREAD_NAME_PREFIX + "FileProc-%d").build();
this.mThreadPool =Executors.newSingleThreadExecutor(namedThreadFactory);
}
@Override
public void run() {
File[] files = null;
//load files in the live_folder
files = ConfigConstants.LIVE_FOLDER.listFiles();
if(files!=null){
for(File f:files){
FileProcTask task = new FileProcTask(f.getName());
mThreadPool.submit(task);
}
}
// Filter which does not accept .tmp files
FileExtensionFilter filter = new FileExtensionFilter("tmp");
while (true) {
// get the non-temp txt file list
files = ConfigConstants.PRODUCTION_FOLDER.listFiles(filter);
if (files.length == 0) {
continue;
}
for (File f : files) {
//move file to the live folder first, then start the task
FileOperator.move(f, ConfigConstants.LIVE_FOLDER);
FileProcTask task = new FileProcTask(f.getName());
mThreadPool.submit(task);
}
}
}
}
内部 FileProcTask 类:
class FileProcTask implements Runnable {
private File mFile;
private String mCurrentFileName;
private HashMap<LogReason, List<String[]>> mTableRowMap;
public FileProcTask(String fileName) {
mCurrentFileName = fileName;
mFile = new File(ConfigConstants.LIVE_FOLDER.getAbsolutePath()+ File.separator+fileName);
this.mTableRowMap = new HashMap<LogReason, List<String[]>>();
}
@Override
public void run() {
boolean success = true;
FileInputStream fis;
BufferedReader reader=null;
try {
fis = new FileInputStream(mFile);
reader = new BufferedReader(new InputStreamReader(fis));
String recordItem = reader.readLine(); // ignore the first line of log file
LogReason logReason= LogReason.Unknown;
while(recordItem!=null){
recordItem = reader.readLine();
ConfigConstants.IncRowsScanned();
logReason = this.getLogReason(recordItem);
if(logReason != LogReason.Unknown && logReason != null){
insertToQueue(mCurrentFileName,logReason
,this.getColumns(recordItem));
}
}
} catch (FileNotFoundException e1) {
success = false;
e1.printStackTrace();
} catch (IOException e) {
success = false;
e.printStackTrace();
}finally{
if (reader!=null)
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(success){
// move file to archive folder
String moveToFolder = FileOperator.getArchiveFolder(mFile);
FileOperator.move(mFile, new File(moveToFolder));
//flush all the rest rows into stream task queue
List<String[]> list_tmp = null;
for (LogReason lr_tmp : mTableRowMap.keySet()) {
list_tmp = mTableRowMap.get(lr_tmp);
if (list_tmp != null && list_tmp.size() > 0) {
synchronized(mTaskQueue){
try {
mTaskQueue.putNewTask(new StreamQueueItem(lr_tmp,
new ArrayList<String[]>(list_tmp)));
list_tmp = null;
System.out.println(mTaskQueue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//long now = System.nanoTime();
//ConfigConstants.addToInterval(now-start);
//System.out.println(mCurrentFileName + " process done##");
}else{
System.out.println(mCurrentFileName + " move back##");
FileOperator.move(mFile, ConfigConstants.PRODUCTION_FOLDER);
System.out.format("$$ File %s processing failed, move it back to production folder for re-try",mCurrentFileName);
}
}
private void insertToQueue(String currentFileName, LogReason logreason, String[] columnValues) {
if(columnValues==null)
return;
List<String[]> currentList = mTableRowMap.get(logreason);
if(currentList==null){
currentList = new ArrayList<String[]>();
mTableRowMap.put(logreason, currentList);
}
if (currentList.size() >= ConfigConstants.ROWS_PER_REQUEST) {
// when the current list reach the request quota, put the Bigquery
// Row list into the blocked stream queue.
synchronized(mTaskQueue){
try {
mTaskQueue.putNewTask(new StreamQueueItem(logreason, new ArrayList<String[]>(currentList)));
currentList.clear();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
}
}
currentList.add(columnValues);
}
}
在最后一个类中,有“System.out.println(mTaskQueue.size());”的语句。这是我尝试输出队列大小的地方。
【问题讨论】:
-
可能是代码有问题。你为什么不分享它:它可以让我们找到问题,而不是猜测它可能是什么。
-
请参阅 BlockingQueue 文档,特别是如果您使用 put() 添加到它:“BlockingQueue 可能是容量有限的。在任何给定时间它可能有一个剩余容量,超出该容量没有其他元素可以不受阻碍地放置。”您可能会达到导致阻塞的容量。如果您尝试 offer() 或 add() ,则如果失败,则会收到异常(这将明确回答您的问题)。
-
@user1676075 听取您的建议,我尝试使用 offer 并打印出结果。尽管如此,当我看到“几乎停止”时,它是真的,但只是逐渐变慢并且没有打印出来......(至少在很长一段时间内)
-
@JBNizet 添加代码:) thx
-
LinedBlockingQueue 不应该是容量有界的,不是吗? (除非它达到 Integer.Max_Value 或让 JVM 内存不足。)
标签: java performance blockingqueue