【问题标题】:Read multiple files using multiple threads in c [closed]在c中使用多个线程读取多个文件[关闭]
【发布时间】:2016-11-28 03:48:44
【问题描述】:
#include <dirent.h>
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include <semaphore.h>


int file_index = 0;         // index for array[500];


struct webData {
    char web_names [255];           
};


void *thread(void *wData_element)
{
    struct webData *temp = wData_element;

    FILE *fp; 
    char line[255]="";      // hold each line;
    fp = fopen(temp->web_names, "r");

    if(fp == NULL)
    {
        perror("Error: File open failure.");
    }
    else
    {
        fgets(line,255, fp);
        printf("%s\n", line);
    }
    fclose(fp);
    return NULL;
}

int main(int argc, char const* argv[]) 
{


    DIR * dir_pointer;          // define a dir pointer;
    struct dirent * entry;      // entry under dir;
    //char *dir = "./data/";
    dir_pointer = opendir("./data/"); // assign dir location into dir pointer


    // declare the struct wData array for each file. 
    struct webData wData[500];
    // declare the threads array.
    pthread_t tid_array[500];


    while( (entry = readdir(dir_pointer)) != NULL)
    {

        if(entry->d_type == DT_REG) // avoid the . and .. dir;
        {

            char full_path[255];     
            full_path[0] = '\0';    // initilize the string;

            strcat(full_path, "./data/");  // concatenate file directory;
            strcat(full_path, entry->d_name);    // concatenate filename;
            strcpy(wData[file_index].web_names, full_path); // store file name into web_names array;


            pthread_create(&tid_array[file_index], NULL, thread, &wData[file_index]);



            file_index++;   // increase the file index for next file.


        }


    }


    for(int i=0; i<500; i++)      
    {
        pthread_join(tid_array[i], NULL);
    }


    return 0;
}

对于这个程序:

data文件夹中有500个文件。

对于每个文件,我创建一个线程来对文件执行一些操作。

在我迭代所有 500 个文件之后。我加入所有线程。

我的问题是:

如何创建 10 个线程,每个线程对 50 个文件执行一些操作?

如何确保每个线程只处理 50 个文件,因为它们同时运行?

例如:

线程 1 处理 1-50 之间的文件编号

线程 2 处理 51-100 的文件编号

.

.

.

非常感谢任何相关来源或示例。

【问题讨论】:

  • 您是否在构建过程中链接pthread
  • @danielu13 谢谢,它现在可以工作了。我的问题呢?
  • 我现在在移动设备上,所以我无法给出详细的答案,但您需要创建一个线程安全的文件队列或每个线程从中读取的文件名。如果一个线程尝试从队列中读取并且它是空的,那么该线程返回并加入。为了更加灵活,您可能希望考虑通过网络执行此操作。您可以使用套接字,或者我非常偏爱 ZeroMQ 来完成此类任务。还有其他类似的库,您可以创建网络队列。对于线程,您可以让它在本地连接并提供连接到它的线程。
  • 如果你走 ZeroMQ 路线,他们有一个非常好的在线指南,这是它的主要文档来源。查看该指南中的 PUSH/PULL 或管道,它可能会对您有所帮助。一些更高级别的 ZMQ 绑定还具有一些更高级的线程功能。请注意,这确实会带来对另一个库的依赖,您可能想要也可能不想要。
  • 您应该向每个线程函数传递一个不同的参数,告诉它要处理哪些文件。然后,您的线程函数将只处理它被告知要处理的文件(您编写了顺从的函数,不是吗?)。如果您正在扫描目录,每个线程都会打开它,跳过 ... 条目,然后在开始处理文件之前跳过适当数量的其他条目。

标签: c multithreading thread-safety


【解决方案1】:

首先你为线程声明一个参数结构

typedef struct thread_param_s {
     // each thread will get an array of webData-files
     struct webData* data;
     // number of elements
     int n;    
} thread_param_t;

您为每个线程创建此参数结构,相应地填充它并将其传递给pthread_create 而不是wData*

现在你调整你当前的代码

#include <dirent.h>
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include <semaphore.h>


int file_index = 0;         // index for array[500];


struct webData {
    char web_names [255];           
};


void *thread(void *param)
{
    thread_param_t* thread_param = (thread_param_t*)param;
    int i;
    // iterate through all files
    for (i = 0; i < thread_param->n; i++) {
        struct webData *temp = thread_param->data + i;

        FILE *fp; 
        char line[255]="";      // hold each line;
        fp = fopen(temp->web_names, "r");

        if(fp == NULL)
        {
            perror("Error: File open failure.");
        }
        else
        {
            fgets(line,255, fp);
            printf("%s\n", line);
        }
    }
    return NULL;
}

int main(int argc, char const* argv[]) 
{


    DIR * dir_pointer;          // define a dir pointer;
    struct dirent * entry;      // entry under dir;
    //char *dir = "./data/";
    dir_pointer = opendir("./data/"); // assign dir location into dir pointer


    // declare the struct wData array for each file. 
    struct webData wData[500];
    // declare the threads array.



    while( (entry = readdir(dir_pointer)) != NULL)
    {

        if(entry->d_type == DT_REG) // avoid the . and .. dir;
        {

            char full_path[255];     
            full_path[0] = '\0';    // initilize the string;

            strcat(full_path, "./data/");  // concatenate file directory;
            strcat(full_path, entry->d_name);    // concatenate filename;
            strcpy(wData[file_index].web_names, full_path); // store file name into web_names array;

            file_index++;   // increase the file index for next file.
            // just fill wData here

        }


    }

    pthread_t tid_array[10];
    thread_param_t thread_param[10];
    int thread_counter = 0;

    // number of files for each thread
    int step = file_index / 10;
    int i;

    // create all threads
    for(i = 0; i < 9; i++)      
    {
       thread_param[i].n = step;
       thread_param[i].data = wData + step * i;

       pthread_create(&tid_array[i], NULL, thread, thread_param + i);
    }
    // the last thread may get more data, because of integer rounding
    thread_param[i].n = file_index - step * i;
    thread_param[i].data = wData + step * i;

    pthread_create(&tid_array[i], NULL, thread, thread_param + i);



    for(int i=0; i<10; i++)      
    {
        pthread_join(tid_array[i], NULL);
    }


    return 0;
}

【讨论】:

    猜你喜欢
    • 2013-08-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-11-28
    相关资源
    最近更新 更多