【问题标题】:Why Java BlockingQueue virtually stops growing up in my program?为什么 Java BlockingQueue 在我的程序中几乎停止增长?
【发布时间】:2014-07-07 19:38:04
【问题描述】:

我有一个用 Java 编写的文件处理程序。处理后的数据将被放入一个 LinkedBlockingQueue 供消费者使用。在调试时,我停止了所有消费者,只是为了让它变得简单。我注意到几乎每次阻塞队列的大小达到 3600ish 左右时,一切都变得越来越慢,然后几乎停止(程序没有退出,但不再有队列大小输出,也没有处理任何文件)没有任何内存不足异常。从资源监视器中,我看到内存水平保持非常稳定。

有人知道可能的原因是什么吗?

//下面是代码: 在主要:

_taskQueue = new BlockingTaskDQueue<StreamQueueItem>();
_fileProcessor = new LogFileProcessor(_taskQueue);
(new Thread(_fileProcessor,LogFileProcessor.myName)).start();

日志文件处理器:

public class LogFileProcessor implements Runnable {
public static final String myName = ConfigConstants.THREAD_NAME_PREFIX + "FileProcessor";

private BlockingTaskDQueue<StreamQueueItem> mTaskQueue;

private ExecutorService mThreadPool;

//indicate whether the thread pool has been shut down
private boolean mIsEnded = false;

public LogFileProcessor(BlockingTaskDQueue<StreamQueueItem> _taskQueue) {
    super();
    this.mTaskQueue = _taskQueue;


    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(ConfigConstants.THREAD_NAME_PREFIX + "FileProc-%d").build();
    this.mThreadPool =Executors.newSingleThreadExecutor(namedThreadFactory);
}

@Override
public void run() {

    File[] files = null;

    //load files in the live_folder
    files = ConfigConstants.LIVE_FOLDER.listFiles();
    if(files!=null){
        for(File f:files){
            FileProcTask task = new FileProcTask(f.getName());
            mThreadPool.submit(task);
        }
    }

    // Filter which does not accept .tmp files
    FileExtensionFilter filter = new FileExtensionFilter("tmp");
    while (true) {
        // get the non-temp txt file list
        files = ConfigConstants.PRODUCTION_FOLDER.listFiles(filter);

        if (files.length == 0) {
            continue;
        }

        for (File f : files) {
            //move file to the live folder first, then start the task
            FileOperator.move(f, ConfigConstants.LIVE_FOLDER);
            FileProcTask task = new FileProcTask(f.getName());
            mThreadPool.submit(task);
        }


    }

  }

}

内部 FileProcTask 类:

    class FileProcTask implements Runnable {
    private File mFile;
    private String mCurrentFileName;
    private HashMap<LogReason, List<String[]>> mTableRowMap;

    public FileProcTask(String fileName) {
        mCurrentFileName = fileName;
        mFile = new File(ConfigConstants.LIVE_FOLDER.getAbsolutePath()+ File.separator+fileName);
        this.mTableRowMap = new HashMap<LogReason, List<String[]>>();
    }

    @Override
    public void run() {     
        boolean success = true;         
        FileInputStream fis;
        BufferedReader reader=null;
        try {
            fis = new FileInputStream(mFile);
            reader = new BufferedReader(new InputStreamReader(fis));
            String recordItem = reader.readLine(); // ignore the first line of log file
            LogReason logReason= LogReason.Unknown;
            while(recordItem!=null){
                recordItem = reader.readLine();
                ConfigConstants.IncRowsScanned();
                logReason = this.getLogReason(recordItem);
                if(logReason != LogReason.Unknown && logReason != null){
                     insertToQueue(mCurrentFileName,logReason
                                    ,this.getColumns(recordItem));
                }
            }
        } catch (FileNotFoundException e1) {
            success = false;
            e1.printStackTrace();
        } catch (IOException e) {
            success = false;
            e.printStackTrace();
        }finally{
            if (reader!=null)
                try {
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
        }
        if(success){
            // move file to archive folder
            String moveToFolder = FileOperator.getArchiveFolder(mFile);
            FileOperator.move(mFile, new File(moveToFolder));

            //flush all the rest rows into stream task queue
            List<String[]> list_tmp = null;
            for (LogReason lr_tmp : mTableRowMap.keySet()) {
                list_tmp = mTableRowMap.get(lr_tmp);
                if (list_tmp != null && list_tmp.size() > 0) {
                    synchronized(mTaskQueue){
                         try {
                            mTaskQueue.putNewTask(new StreamQueueItem(lr_tmp,
                                                   new ArrayList<String[]>(list_tmp)));
                            list_tmp = null;
                            System.out.println(mTaskQueue.size());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }

        //long now = System.nanoTime();
        //ConfigConstants.addToInterval(now-start);
        //System.out.println(mCurrentFileName + " process done##");
        }else{
            System.out.println(mCurrentFileName + " move back##");
            FileOperator.move(mFile, ConfigConstants.PRODUCTION_FOLDER);
            System.out.format("$$ File %s processing failed, move it back to production folder for re-try",mCurrentFileName);
        }
    }

    private void insertToQueue(String currentFileName, LogReason logreason, String[] columnValues) {
        if(columnValues==null)
            return;
        List<String[]> currentList = mTableRowMap.get(logreason);
        if(currentList==null){
            currentList = new ArrayList<String[]>();
            mTableRowMap.put(logreason, currentList);
        }
        if (currentList.size() >= ConfigConstants.ROWS_PER_REQUEST) {
            // when the current list reach the request quota, put the Bigquery
            // Row list into the blocked stream queue.
            synchronized(mTaskQueue){
                try {
                    mTaskQueue.putNewTask(new StreamQueueItem(logreason, new ArrayList<String[]>(currentList)));
                    currentList.clear();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return;
                }
            }
        } 
        currentList.add(columnValues);
    }
}  

在最后一个类中,有“System.out.println(mTaskQueue.size());”的语句。这是我尝试输出队列大小的地方。

【问题讨论】:

  • 可能是代码有问题。你为什么不分享它:它可以让我们找到问题,而不是猜测它可能是什么。
  • 请参阅 BlockingQueue 文档,特别是如果您使用 put() 添加到它:“BlockingQueue 可能是容量有限的。在任何给定时间它可能有一个剩余容量,超出该容量没有其他元素可以不受阻碍地放置。”您可能会达到导致阻塞的容量。如果您尝试 offer() 或 add() ,则如果失败,则会收到异常(这将明确回答您的问题)。
  • @user1676075 听取您的建议,我尝试使用 offer 并打印出结果。尽管如此,当我看到“几乎停止”时,它是真的,但只是逐渐变慢并且没有打印出来......(至少在很长一段时间内)
  • @JBNizet 添加代码:) thx
  • LinedBlockingQueue 不应该是容量有界的,不是吗? (除非它达到 Integer.Max_Value 或让 JVM 内存不足。)

标签: java performance blockingqueue


【解决方案1】:

感谢您的建议。我最终交叉引用了 JVM 内存模型。当 BlockingQueue 在达到其容量之前尝试放入更多项目时,它会与其他线程(应用程序线程或我的应用程序调用的系统本机线程)竞争 JVM 堆空间。因此,当这种竞争发生时,程序会卡在某个地方,当 GC 无法找到任何可用的堆空间并出现 OutofMemory 异常时,程序就会崩溃。而且,这个异常会在程序卡住一段时间后发生。我想这就是为什么我在队列 put/offer 中没有看到任何异常的原因。

【讨论】:

    猜你喜欢
    • 2018-10-30
    • 2018-02-15
    • 2022-01-15
    • 2019-02-22
    • 1970-01-01
    • 2021-10-31
    • 2014-03-02
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多