【问题标题】:Thread synchronization - when to terminate the thread?线程同步 - 何时终止线程?
【发布时间】:2017-04-27 07:28:00
【问题描述】:

我正在编写一个需要一些输入的程序;一个目录,一个文件名和一些标志。该程序的目的是在给定目录中搜​​索给定文件。在搜索时,如果它找到另一个目录,它将打开该目录并继续在那里搜索。其中一个标志允许用户选择程序将使用多少线程来搜索文件。

目录存储在堆栈中,我遇到的问题是线程之间的同步。我目前正在使用互斥锁和定时等待条件。这意味着如果线程等待了一定时间并且存储目录的堆栈为空,则线程将结束。问题是,当只运行 2 个线程时,1 个线程可能会完成所有工作,即打开 400 个目录,而另一个打开 0 个。

所以我的问题是......我怎样才能以更好的方式同步我的线程?也许不使用定时等待条件?线程应该什么时候终止?

#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <dirent.h>
#include <getopt.h>
#include <string.h>
#include <limits.h>
#include <errno.h>
#include <pthread.h>

void search_func(char *path, char *name, int d, int f, int l);
void *thread_func(void *arg);
void push(char *data);
char* pop();
#define MAXLENGTH 1000
#define MAXSIZE 10000
#define WAIT_TIME_SECONDS 0.1
pthread_mutex_t lock;
pthread_cond_t count_threshold_cv;
struct stack
{
    char stk[MAXSIZE][MAXLENGTH];
    int top;
};
typedef struct stack STACK;
STACK s;

struct arg_keeper {
    char **argv;
    int argc;
    int d;
    int f;
    int l;
};

int main(int argc, char **argv) {
    if(argc < 3) {
        fprintf(stderr, "Not enough arguments\n");
        return 1;
    }
    char *xValue = NULL;
    int x;
    int d = 0;
    int f = 0;
    int l = 0;
    int nrthr = 0;
    opterr = 0;
    int thread_count = 0;
    int directory_exist = 0;
    pthread_t tid[1024];

    while ((x = getopt(argc, argv, "t:p:")) != -1) {
        switch (x) {
        case 't':
            xValue = optarg;
            if (*xValue == 'd') {
                d = 1;
            } else if (*xValue == 'f') {
                f = 1;
            } else if (*xValue == 'l') {
                l = 1;
            }
            break;
        case 'p':
            nrthr = atoi(optarg);
            if(nrthr == 0) {
                fprintf(stderr, "Invalid thread count\n");
                return 1;
            }
            break;
        case '?':
            if (isprint (optopt))
                fprintf(stderr, "Unknown option '-%c'.\n",
                                optopt);
            return 1;
        default:
            abort();
        }
    }

    if (argc >= 3) {
        int i;
        for (i = optind; i < argc - 1; i++) {
            directory_exist = 1;
            push(argv[i]);
        }
    }
    if(directory_exist == 0) {
        fprintf(stderr, "No directories entered\n");
        return 1;
    }

    struct arg_keeper * arg_struct = malloc(sizeof(*arg_struct));
    arg_struct->argv = argv;
    arg_struct->argc = argc;
    arg_struct->d = d;
    arg_struct->f = f;
    arg_struct->l = l;

    if(pthread_mutex_init(&lock, NULL) != 0) {
        fprintf(stderr, "Mutex initialisation failed\n");
        return 1;
    }
    if(pthread_cond_init(&count_threshold_cv, NULL) != 0) {
        fprintf(stderr, "Condition variable initialisation failed\n");
        return 1;
    }

    while(thread_count < nrthr - 1) {
        if(pthread_create(&(tid[thread_count++]), NULL, thread_func,
                            arg_struct) != 0)
            fprintf(stderr, "Can't create thread\n");
    }

    if(nrthr!=0)
        thread_func(arg_struct);
    else
        thread_func(arg_struct);

    int c;
    for(c = 0; c < nrthr; c++) {
        pthread_join(tid[c], NULL);
    }
    pthread_mutex_destroy(&lock);
    free(arg_struct);
    return 0;
}

void *thread_func(void *arg) {
    int dirOpened = 0;
    struct arg_keeper arg_struct = *(struct arg_keeper *)arg;
    char *data;

    pthread_mutex_lock(&lock);
    struct timespec ts;
    struct timeval tp;
    while(1) {
        gettimeofday(&tp, NULL);
        ts.tv_sec  = tp.tv_sec;
        ts.tv_nsec = tp.tv_usec * 1000;
        ts.tv_sec += WAIT_TIME_SECONDS;

        if (pthread_cond_timedwait(&count_threshold_cv, &lock, &ts) == ETIMEDOUT) {
            if (s.top) {
                data = pop();
                pthread_cond_signal(&count_threshold_cv);
                dirOpened++;
                search_func(data, arg_struct.argv[arg_struct.argc - 1], arg_struct.d,
                        arg_struct.f, arg_struct.l);
            }
            else
                break;
        }
    }
    pthread_mutex_unlock(&lock);
    fprintf(stdout, "Thread with id %lu opened %d directories\n",
                pthread_self(), dirOpened);
    return NULL;
}

void search_func(char *inPath, char *testName, int d, int f, int l) {
    char path[PATH_MAX];
    strcpy(path, inPath);
    struct dirent *pDirent;
    DIR *pDir;
    struct stat file_info;

    if ((pDir = opendir(path)) == NULL) {
        fprintf(stderr, "Error:'%s': %s\n", path, strerror(errno));
    } else {
        int v1;
        int v2;
        char *str1 = ".";
        char *str2 = "..";

        char name[PATH_MAX];
        strcpy(name, testName);

        char testPath[PATH_MAX];
        strcpy(testPath, path);

        char testPathLast[PATH_MAX];
        strcpy(testPathLast, path);

        while ((pDirent = readdir(pDir)) != NULL) {
            if (strcmp(pDirent->d_name, name) == 0 && d == 0 &&
                    f == 0 && l == 0) {
                if (path[strlen(path) - 1] != '/')
                    strcat(testPathLast, "/");

                strcat(testPathLast, pDirent->d_name);
                fprintf(stdout, "%s\n", testPathLast);
            }

            char testPath2[PATH_MAX];
            strcpy(testPath2, testPath);
            strcat(testPath2, "/");
            strcat(testPath2, pDirent->d_name);

            if (lstat(testPath2, &file_info) != 0)
                fprintf(stderr, "lstat error2: %s\n",
                            strerror(errno));

            if (d == 1) {
                if (strcmp(pDirent->d_name, name)
                    == 0 && S_ISDIR(file_info.st_mode)) {
                    if (path[strlen(path) - 1] != '/')
                        strcat(testPathLast, "/");

                    strcat(testPathLast, pDirent->d_name);
                    fprintf(stdout, "%s\n", testPathLast);
                }
            }

            if (f == 1) {
                if (strcmp(pDirent->d_name, name)
                    == 0 && S_ISREG(file_info.st_mode)) {
                    if (path[strlen(path) - 1] != '/')
                        strcat(testPathLast, "/");

                    strcat(testPathLast, pDirent->d_name);
                    fprintf(stdout, "%s\n", testPathLast);
                }
            }

            if (l == 1) {
                if (strcmp(pDirent->d_name, name)
                    == 0 && S_ISLNK(file_info.st_mode)) {
                    if (path[strlen(path) - 1] != '/')
                        strcat(testPathLast, "/");

                    strcat(testPathLast, pDirent->d_name);
                    fprintf(stdout, "%s\n", testPathLast);
                }
            }

            v1 = strcmp(pDirent->d_name, str1);
            v2 = strcmp(pDirent->d_name, str2);

            if ((v1 != 0 && v2 != 0) && S_ISDIR(file_info.st_mode)) {
                strcpy(path, testPath);
                strcpy(path, testPath);
                if (path[strlen(path) - 1] != '/')
                    strcat(path, "/");
                strcat(path, pDirent->d_name);
                push(path);
            }
        }
        closedir(pDir);
    }
}

void push(char *data)
{
    if(s.top == (MAXSIZE - 1)) {
        fprintf(stderr, "Stack is full\n");
        return;
    }
    else {
        s.top = s.top + 1;
        strcpy(&(s.stk[s.top][0]), data);
    }
    return;
}

char* pop()
{
    char *data;
    if(s.top == -1) {
        fprintf(stderr, "Stack is empty\n");
        return NULL;
    }
    else {
        data = s.stk[s.top];
        s.top = s.top - 1;
    }
    return data;
}

【问题讨论】:

  • 互斥锁保护共享资源免受并发访问。这里的共享资源是什么。它是否受到互斥锁的安全保护?即:共享资源在任何情况下都不会被并发访问(不包括并发只读访问)?
  • 我首先想到的是使用信号量和要搜索的目录列表。线程在信号量上等待要搜索的目录。当有一个时,它会搜索它,并将它在其中找到的目录添加到列表中,并发布信号量该次数。这只是生产者/消费者的情况,生产者和消费者线程是相同的。 (见link on WP

标签: c multithreading synchronization pthreads


【解决方案1】:

虽然使用 POSIX nftw() 或 BSD fts(在 Linux 中的 glibc 中都可用)更好地解决了 OP 的实现,但这个实现中的潜在问题实际上非常有趣:每个工作线程最初消耗一个数据,然后处理一段时间,并可能产生额外的基准。

问题情况是所有现有数据都被消耗,但有一个或多个工作线程,可能会产生额外的数据需要处理。因此,没有更多数据要处理并不足以成为工作线程退出的理由。仅当没有更多数据要处理时,工作线程才应该退出,并且没有工作线程可以生成额外的数据运行

显而易见的解决方案是使用互斥体、条件变量(用于等待新数据)和当前运行的工作线程数量的计数器。

假设数据存储在一个单链表中:

struct work_item {
    struct work_item *next;
    char              path[];
};

上面的path 成员是一个C99 灵活的数组成员。我们可以用来描述要完成的工作的结构可以是

struct work {
    pthread_mutex_t   mutex;
    pthread_cond_t    cond;
    long              active;
    struct work_item *item;
};
#define WORK_INITIALIZER {     \
    PTHREAD_MUTEX_INITIALIZER, \
    PTHREAD_COND_INITIALIZER,  \
    0L, NULL }

将初始项推送到item 列表后,将创建一个或多个线程,并给出指向共享struct work 结构的指针。

逻辑类似如下:

void *worker_thread(void *work_ptr)
{
    struct work *const work = (struct work *)word_ptr;
    struct work_item  *item;

    pthread_mutex_lock(&(work->mutex));

    while (1) {

        /* If there are no active workers,
           nor any work items, we're done. */
        if (!work->item && !work->active) {
            /* Ensure threads waiting on the condition
               variable are woken up, so they quit too. */
            pthread_cond_broadcast(&(work->cond));
            pthread_mutex_unlock(&(work->mutex));
            return NULL;
        }

        /* No work items left? */
        if (!work->item) {
            /* Wait for a new one to be produced,
               or a worker to notice we're done. */
            pthread_cond_wait(&(work->cond), &(work->mutex));
            continue;
        }

        /* Increment active worker count, grab an item,
           and work on it. */
        work->active++;
        item = work->item;
        work->item = work->item->next;
        item->next = NULL;

        /* Unlock mutex while working. */
        pthread_mutex_unlock(&(work->mutex));

        /*
         * TODO: Work on item
        */

        pthread_mutex_lock(&(work->mutex));
        work->active--;
    }
}

当然,在处理一项时,必须在将一项推入堆栈时重新获取互斥锁,并在条件变量上发出信号以唤醒工作线程(如果正在等待新工作):

        struct work_item *temp;

        /* TODO: Allocate and initialize temp */

        pthread_mutex_lock(&(work->mutex));
        temp->next = work->item;
        work->item = temp;
        pthread_cond_signal(&(work->cond));
        pthread_mutex_unlock(&(work->mutex));

注意活动计数器如何反映当前正在处理项目的线程数(基本上是当前运行的生产者的数量)。 不是现有工作线程的数量!

如果一个线程注意到没有更多的项目要处理,也没有任何生产者在运行,则广播条件变量,这样如果有任何线程在等待新的工作,就会被唤醒(同样注意到没有更多的工作)。每当将项目添加到工作列表时,也会向条件变量发出信号(仅唤醒一个等待线程)。

【讨论】:

  • 您将如何将其应用于我的代码?我尝试使用变量来跟踪工作线程,但它从来没有工作过。
  • @Fjodor:如果我实现了目录扫描以外的其他东西会有所帮助吗?比如说,某种字符串排列(工人消耗一个字符,并根据他们消耗的一个字符产生零、一个或两个新字符串)?我希望您(和其他阅读此答案的人)了解如何解决这种生产者-消费者问题,但如果没有显示现成的答案,您可以复制和粘贴并按原样使用。也就是说,我很乐意帮助您了解一种可能的解决方案,但您仍然必须自己将其应用于您的特定问题。
  • 我不这么认为,我的意思是,我一直试图让它工作一段时间,但我无法解决它。当然,我会阅读您的答案并将其与我其他尝试过的版本进行比较,看看我在思考过程中哪里出了问题。我想我知道如何解决它,将工作目录作为第二个参数跟踪线程应该何时终止(第一个参数是堆栈为空)。但我就是无法让它工作。
  • @Fjodor:不幸的是,我认为您的方法存在根本缺陷。没有openat(),你就不能制作一个健壮的 POSIX 目录树遍历器;使用路径仅适用于静态或只读文件系统,因为任何目录移动或重命名都会完全抛弃纯路径/基于名称的步行者。此外,这已经是一个很好解决的问题(nftw()fts()),因此不是很有趣。所以不,我不会写这样的实现,即使生产者-消费者子问题 很有趣。
【解决方案2】:

如何以更好的方式同步我的线程?也许不使用定时等待条件?

是的 - 我会删除条件变量并使用一组两个semaphores,第一个信号量计算堆栈上的挂起目录,第二个信号量计算繁忙的工作线程。为了保持工作线程简单,我会在创建的线程中完成所有搜索工作,而不是从main() 调用thread_func()。可以保留pthread_mutex_t lock 以保护STACK s 免受并发访问。

线程应该什么时候终止?

正如 Nominal Animal 所写:只有在没有更多数据需要处理且没有工作线程可以生成额外数据运行时,才应该退出工作线程。 上述信号量提供了所需的信息并允许在main() 线程中轻松等待该条件。

对您的程序的更改将是

  • 在文件范围内:

    #include <sys/sem.h>
    int semid;
    
  • push():

    void push(char *data)
    {
        pthread_mutex_lock(&lock);
        if (s.top == MAXSIZE-1)
            fprintf(stderr, "Stack is full\n");
        else
            strcpy(s.stk[++s.top], data),
            semop(semid, &(struct sembuf){0, 1}, 1);    // add 1 to "dirs on stack"
        pthread_mutex_unlock(&lock);
        return;
    }
    

    请注意,我们在这里计算第一个信号量。

  • pop():

    char *pop()
    {
        char *data;
        pthread_mutex_lock(&lock);
        if (s.top == -1)
            fprintf(stderr, "Stack is empty\n"),
            data = NULL;
        else
            data = strdup(s.stk[s.top--]);  // Don't return freed stack slot!
        pthread_mutex_unlock(&lock);
        return data;
    }
    

    请注意,我们返回数据的副本而不仅仅是指向它的指针,因为释放的栈顶可以随时被另一个线程重用和覆盖。

  • main() 目录参数被压入堆栈之前:

        // create semaphore set of 2 sems: [0] dirs on stack, [1] threads at work
        semid = semget(IPC_PRIVATE, 2, S_IRWXU);
        semctl(semid, 0, SETALL, (unsigned short [2]){});   // zero the sem values
    

    请注意,这必须放在第一次调用 push() 之前,以便它可以计算信号量。

  • main() 中创建线程并调用thread_func()

        while (thread_count < nrthr)
            if (pthread_create(&tid[thread_count++], NULL, thread_func, arg_struct))
                fprintf(stderr, "Can't create thread\n");
    
        // wait until no more dirs on stack and no more threads at work
        semop(semid, (struct sembuf []){{0, 0}, {1, 0}}, 2);
        semctl(semid, 0, IPC_RMID); // remove the semaphores, make threads finish
    

    请注意,我们创建nrthr 而不是nrthr - 1 线程,因为main() 线程不参与工作,它只是等待所有工作完成。然后,它会破坏信号量集,从而导致工作线程退出循环并完成(见下文)。

  • thread_func():

    void *thread_func(void *arg)
    {
        int dirOpened = 0;
        struct arg_keeper arg_struct = *(struct arg_keeper *)arg;
        char *data;
        // wait for work, subtract 1 from dirs on stack and add 1 to threads at work
        while (semop(semid, (struct sembuf []){{0, -1}, {1, 1}}, 2) == 0)
        {   // this loop ends when semid is removed
            data = pop();
            dirOpened++;
            search_func(data, arg_struct.argv[arg_struct.argc-1],
                        arg_struct.d, arg_struct.f, arg_struct.l);
            free(data);
            semop(semid, &(struct sembuf){1, -1}, 1);   // "threads at work" -= 1
        }
        fprintf(stdout, "Thread with id %lu opened %d directories\n",
                pthread_self(), dirOpened);
        return (void *)dirOpened;
    }
    

    注意semop()main() 破坏信号量集时返回-1,因此循环结束。另请注意,我们释放了在pop() 中分配的数据副本。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-03-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多