【问题标题】:Reading Multiple Files Parallel into a buffer将多个文件并行读入缓冲区
【发布时间】:2014-02-14 05:44:36
【问题描述】:

我在一个项目中工作,我必须读取一组文件并将其放入缓冲区中。列表由小文件和大文件组成。我必须阅读这些文件,为了提高效率,我尝试在多个线程。每个线程将从文件名向量中获取一个文件并开始读取它并将其放入缓冲区中,这些缓冲区必须放入队列中。我碰巧在程序中有一些错误,我不知道到底在哪里在我的程序中出现错误也不知道为什么?请帮助我在我的逻辑或代码中是否有任何错误以及如何纠正它。在此先感谢

using namespace std;
#define MAX_THREADS 2
#define BUFFER_SIZE 8388608
vector<string>files;
deque<string>bufferq;
CRITICAL_SECTION Readlock;
int count = 0;

DWORD WINAPI ReadThread(LPVOID s);
int main(int argc,char *argv[])
{
    HANDLE ReadT[MAX_THREADS];

    char *filelist[5];
    DWORD threadid;
    filelist[0] = "1.txt";
    filelist[1] = "cloudy.jpg";
    filelist[2] = "connectify.exe";
    filelist[3] = "VMware.exe";
    filelist[4] = "Sherlock.mp4";
    for(int i=0;i<5;i++)
        files.push_back(filelist[i]);

    InitializeCriticalSection(&Readlock);
    long t1 = GetTickCount();
    for(int k = 0; k< MAX_THREADS; k++)
        ReadT[k] = CreateThread(NULL,0,ReadThread,NULL,NULL,&threadid);

    WaitForMultipleObjects(MAX_THREADS,ReadT,TRUE,INFINITE);
    cout << " Time Taken "<< GetTickCount()-t1 << "ms" ;

    system("pause");
    return 0;
}
DWORD WINAPI ReadThread(LPVOID s)
{
    long pending = 0;
    //int freespace = BUFFER_SIZE;
    char *filename = new char[50];
    char fsize[10];
    string file;
    char *buf;
    buf = new char[BUFFER_SIZE];
    long filesize = 0;
    int numfiles = files.size();
    int filled = 0;
    int i = 0;
    FILE *fp;
    char* ptr;
    ptr = buf;

    while(true)
    {
        EnterCriticalSection(&Readlock);
        if(files.empty())
        {
            LeaveCriticalSection(&Readlock);
            break;
        }
        else
        {
            file = files.front();
            files.erase(files.begin());
            LeaveCriticalSection(&Readlock);
        }
        bool buff_full = false;
        buf = ptr;
        int freespace = BUFFER_SIZE;
        memset(buf,0,BUFFER_SIZE);
        if(!buff_full)
        {
            if(pending == 0)
            {
                fp = fopen(file.c_str(),"rb");
                if(!fp)
                    {
                        cout<<"\nNo such file";
                        cout<<files[i];
                        system("pause");
                        return 0;
                    }
                int r1 =fseek(fp, 0L, SEEK_END);
                filesize = ftell(fp);
                int r2 =fseek(fp, 0L, SEEK_SET);
                sprintf(fsize, "%ld", filesize);
                if(freespace >= (strlen(fsize) + strlen(file.c_str()) + 2))
                {
                    count++;
                    memcpy(buf, file.c_str(), strlen(file.c_str())+1);
                    freespace = freespace - strlen(file.c_str()) - 1;
                    buf += strlen(file.c_str()) + 1;
                    memcpy(buf,fsize,strlen(fsize)+1);
                    buf += strlen(fsize) + 1;
                    freespace = freespace - strlen(fsize) - 1;
                    cout<<"Files read is "<<count<<"\n";
                    if(freespace == 0)
                    {
                        buff_full = true;
                        pending = filesize;
                        break;
                    }
                }
                else
                {
                    filled = BUFFER_SIZE - freespace;
                    fclose(fp);
                    break;
                }
                if(freespace >= filesize)
                {
                    fread(buf, 1, filesize, fp);
                    buf += filesize;
                    freespace = freespace - filesize;
                    bufferq.push_back(buf);
                    //cout << "pop"<<bufferq.size();
                    //i++;
                    if(files.empty())
                    {
                        filled = BUFFER_SIZE - freespace;
                        fclose(fp);
                        break;
                    }
                    fclose(fp);
                }
                else
                {
                    fread(buf, 1, freespace, fp);
                    bufferq.push_back(buf);
                    //cout <<"pop "<<bufferq.size();
                    buff_full = true;
                }
            }
            else
            {
                if(freespace >= pending)
                {
                    fread(buf, 1, pending, fp);
                    bufferq.push_back(buf);
                    freespace = freespace - pending;
                    pending = 0;
                    //i++;
                    if(files.empty())
                    {
                        filled = BUFFER_SIZE - freespace;
                        fclose(fp);
                        break;
                    }
                    if(freespace > 0)
                        buf += pending;
                    else
                        buff_full = true;
                    fclose(fp);
                }
                else
                {
                    fread(buf, 1, freespace, fp);
                    bufferq.push_back(buf);
                    cout << bufferq.size();
                    pending = pending - freespace;
                    buff_full = true;
                }
            }
        }
        if(buff_full)
        {
            buf = ptr;
            cout << "popping buffer " << bufferq.size();
            //bufferq.pop_back();
        }
    }
    return 0;
}

【问题讨论】:

  • 首先你有未使用的变量(例如,你用filename做什么?)。其次,添加到bufferq 时会出现竞争条件。第三,你使用旧的 C 风格 FILE 函数而不是 C++ 流有什么原因吗?
  • @JoachimPileborg 我真的不知道发生了什么竞争情况。它对于适合缓冲区的文件非常正常。仅当我将文件添加到大小恰好大于缓冲区的向量中时才会发生错误。没有理由使用 C 流。你能解释一下吗?
  • 对于竞态条件,如果多个线程同时尝试向bufferq添加字符串怎么办?至于缓冲区的问题,我建议您在调试器中运行并逐行执行代码。至于我对未使用变量的评论,您拥有的杂乱和未使用的代码越少,就越容易更好地了解正在发生的事情,这将有助于其他人阅读您的代码包括您自己未来。
  • @JoachimPileborg ..我尝试使用调试器。我发现此错误“运行时检查失败 #2 - 变量 'fsize' 周围的堆栈已损坏。”我不知道它是否完美运行对于适合缓冲区的文件和大文件,它显示错误。我认为问题出在我放置的循环中。你能建议需要更改吗?对于队列,因为它是一个全局的,我必须锁定并推入它的仪式?
  • 然后你写超出了一些堆栈变量的边界,可能是fsize数组。在调试器中单步执行代码时,请注意所有变量及其值。如果你停止使用旧的 C 风格的字符串和数组,而只使用 C++ std::stringstd::vector 等,许多这些问题都可以避免。

标签: c++ multithreading winapi filesystems buffer


【解决方案1】:

在大文件发生错误的情况下,我认为这行可能会导致问题

sprintf(fsize, "%ld", filesize);

fsize 是 char[10],如果文件大小 >= 1,000,000,000,您将用尾随 0 覆盖 fsize 数组。这将导致“运行时检查失败 #2 - 变量 'fsize' 周围的堆栈已损坏。” ,正如你所写。请检查您的测试文件的大小。

其中,您正在 i 上循环填充文件,然后您写道:

files.erase(files.begin());
// ...
cout<<"\nNo such file";
cout<<files[i];

files[i] 在您删除它们时已经指向另一个元素,如果文件在最后一次迭代中为空,则会导致崩溃。

如果你不将文件和fsize复制到bufferq,你又是为了什么?

由于 bufferq 是可写的并且在线程之间共享,所以对它的访问应该受到锁的保护,你选择的关键部分。

这是我的小代码审查。

【讨论】:

  • @Alex...谢谢现在我真正需要的东西。复制文件及其大小用于存储元数据。你能告诉我用什么方法可以改变我的循环,以便它可以有效地读取大小文件并将其插入队列?
  • "复制文件及其大小是为了存储元数据" - 很好,但你不要将它作为 bufferq.push_back(buf) 复制到 bufferq;不是从开头复制缓冲区,而是从向前移动的 buf 指针复制缓冲区。根本不使用这些元数据。为简单起见更改为 fsize[32] - 这应该可以修复堆栈帧的错误。
  • @Alex...非常感谢..但是请告诉我如何优化多线程读取缓冲区以实现有效的并行性?
  • 这是另一个相当大的问题! :)
  • 您的逻辑乍一看是可以接受的,但请锁定所有共享的可写数据结构。另外,在这里:fread(buf, 1, freespace, fp);bufferq.push_back(buf); 你不认为我们需要从 buf 准备 C 字符串吗? fread(buf, 1, freespace-1, fp);buf[freespace-1] = 0;
猜你喜欢
  • 2018-10-02
  • 2018-07-21
  • 2019-11-26
  • 1970-01-01
  • 1970-01-01
  • 2021-11-30
  • 2023-03-29
  • 2014-01-06
  • 1970-01-01
相关资源
最近更新 更多