【问题标题】:Implement barrier with pthreads on C在 C 上使用 pthread 实现屏障
【发布时间】:2021-08-30 18:58:02
【问题描述】:

我正在尝试并行化合并排序算法。我正在做的是为每个线程划分输入数组,然后合并线程结果。我试图合并结果的方式是这样的:

thread 0                     |   thread 1        |   thread 2         |   thread 3

sort(A0)                     |   sort(A1)        |   sort(A2)         | sort(A3)
merge(A0,A1)                 |                   |   merge(A2,A3)     | 
merge(A0A1, A2A3)            |                   |                    |

所以,在我的函数sortManager 结束时,我调用了应该实现上述逻辑的函数mergeThreadResults。在其中我迭代对以合并相应的线程。然后,如果需要,我将最后一个项目合并到线程 0。它看起来像这样:

void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {

    int nextThread;
    int iter = 2;
    while (iter <= threads) {
        int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
        int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;

        printf("Merging threads %ld to %d\n", myRank, nextThread);
        
        if (myRank % iter != 0) {
            break;
        }

        merge(sortingArray, myLeft, myRight, nextThreadRight);
        sleep(3); // <- sleep

        myRight = nextThreadRight;
        iter = iter * 2;
    }

     if (myRank == 0 && nextThread < threads-1) {
        int nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
     }

}

它似乎按预期工作。问题是,我使用sleep 函数来同步线程,这远非最佳方法。所以我正在尝试用 pthread 实现一个屏障。
在其中,我尝试计算该循环需要多少次迭代并将其传递为breakpoint。当所有线程都在同一点时,我释放合并功能并在新的循环中再次等待。这是我尝试过的:

        pthread_mutex_lock(&mutex);
        counter++;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
        if(counter >= breakpoint ) {
            counter = 0;
            pthread_cond_broadcast(&cond_var);
        } else {
            while (pthread_cond_wait(&cond_var, &mutex) != 0);
        }
        pthread_mutex_unlock(&mutex);

但它没有按预期工作。一些merge 在最后一个循环完全结束之前触发,给我留下了一个部分排序的数组。

这是我的测试代码的一个小例子:

#define _GNU_SOURCE

#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <time.h>

#include <pthread.h>
#include <unistd.h>

// Initialize global variables
int sortingArray[20] = {5,-4,3,-1,-2,3,1,2,-2,-1,-2,-1,-2,-3,4,1234,534,123,87,123};
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;

struct ThreadTask {
    long rank;
    int size;
    int threads;
};

void merge(int arr[], int left, int mid, int right) {
    /* Merge arrays */

    int i, j, k;
    int n1 = mid - left + 1;
    int n2 = right - mid;

    // Alocate temp arrays
    int *L = malloc((n1 + 2) * sizeof(int));
    int *R = malloc((n2 + 2) * sizeof(int));
    if (L == NULL || R == NULL) {
        fprintf(stderr, "Fatal: failed to allocate memory fo temp arrays.");
        exit(EXIT_FAILURE);
    }

    // Populate temp arrays
    for (i = 1; i <= n1; i++) {
        L[i] = arr[left + i - 1];
    }
    for (j = 1; j <= n2; j++) {
        R[j] = arr[mid + j];
    }

    L[n1 + 1] = INT_MAX;
    R[n2 + 1] = INT_MAX;
    i = 1;
    j = 1;

    // Merge arrays
    for (k = left; k <= right; k++) {
        if (L[i] <= R[j]) {
            arr[k] = L[i];
            i++;
        } else {
            arr[k] = R[j];
            j++;
        }
    }

    free(L);
    free(R);
}


void mergeSort(int arr[], int left, int right) {
    /* Sort and then merge arrays */

    if (left < right) {
        int mid = left + (right - left) / 2;

        mergeSort(arr, left, mid);
        mergeSort(arr, mid + 1, right);

        merge(arr, left, mid, right);
    }
}


void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {

    int nextThread;
    int iter = 2;
    while (iter <= threads) {
        int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
        int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;

        printf("Merging threads %ld to %d\n", myRank, nextThread);
        
        if (myRank % iter != 0) {
            break;
        }

        // barrier
        pthread_mutex_lock(&mutex);
        counter++;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
        if(counter >= breakpoint ) {
            counter = 0;
            pthread_cond_broadcast(&cond_var);
        } else {
            while (pthread_cond_wait(&cond_var, &mutex) != 0);
        }
        pthread_mutex_unlock(&mutex);

        merge(sortingArray, myLeft, myRight, nextThreadRight);
        sleep(2); // <- sleep

        myRight = nextThreadRight;
        iter = iter * 2;
    }

     if (myRank == 0 && nextThread < threads-1) {
        int nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
     }

}

void *sortManager(void *threadInfo) {
    /* Manage mergeSort between threads */

    struct ThreadTask *currentTask = threadInfo;

    // Get task arguments
    long rank = currentTask->rank;
    int left= rank * ((float)currentTask->size / (float)currentTask->threads);
    int right = (rank + 1) * ((float)currentTask->size / (float)currentTask->threads) - 1;
    int mid = left + (right - left) / 2;

    // Execute merge for task division
    if (left < right) {
        mergeSort(sortingArray, left, mid);
        mergeSort(sortingArray, mid + 1, right);
        merge(sortingArray, left, mid, right);
    }

    // Merge thread results
    if (rank % 2 == 0)  {
        mergeThreadResults(rank, left, right, currentTask->size, currentTask->threads);
    }

    return 0;
}


struct ThreadTask *threadCreator(int size, int threads, pthread_t *thread_handles, struct ThreadTask *tasksHolder) {
    /* Create threads with each task info */

    struct ThreadTask *threadTask;

    for (long thread = 0; thread < threads; thread++){
        threadTask = &tasksHolder[thread];
        threadTask->rank = thread;
        threadTask->size = size;
        threadTask->threads = threads;

        pthread_create(&thread_handles[thread], NULL, sortManager, (void*) threadTask);
    }

    return tasksHolder;
}


void printArray(int arr[], int size) {
    /* Print array */

    for (int arrayIndex = 0; arrayIndex < size; arrayIndex++)
        printf("%d ", arr[arrayIndex]);
    printf("\n");
}


int main(int argc, char *argv[]) {

    // Initialize arguments
    int arraySize = 20;
    int totalThreads = 16;

    
    // Display input
    printf("\nInput array:\n");
    printArray(sortingArray, arraySize);
    

    // Initialize threads
    pthread_t *thread_handles;
    thread_handles = malloc(totalThreads * sizeof(pthread_t));

    // Create threads
    struct ThreadTask threadTasksHolder[totalThreads];
    *threadTasksHolder = *threadCreator(arraySize, totalThreads, thread_handles, threadTasksHolder);
    
    // Execute merge sort in each thread
    for (long thread = 0; thread < totalThreads; thread++) {
        pthread_join(thread_handles[thread], NULL);
    }
    free(thread_handles);
    

    // Display output
    printf("\nSorted array:\n");
    printArray(sortingArray, arraySize);
    
    return 0;
}

【问题讨论】:

  • 你不能使用标准的 pthread 屏障?
  • @Shawn 我可以,但我也没能成功。

标签: c parallel-processing pthreads pthread-barriers


【解决方案1】:

正如@John Bollinger 所说,您的方法看起来非常困难,解决方案也同样复杂。但是如果你想实现一个屏障,我建议你把它放在mergeThreadResults 中的merge 之后。这样,您可以等待在该循环中工作的所有线程完成,然后再进行下一个。

要创建它,您需要在每次迭代中通过一个新的障碍。因为在每个周期执行合并的线程数都会减少。所以开始宣布一些全球性的障碍:

int mergeCycleFlag = 0;
pthread_mutex_t mutex;
pthread_barrier_t *mergeBarrier;

该标志用于为每次迭代创建一个屏障,我们将需要为每个循环创建多个 mergeBarrier。不要忘记在您的 main 函数中初始化它,并使用您将执行的迭代次数:mergeBarrier = realloc(mergeBarrier, howManyIterations);

然后我们可以像这样创建一个屏障:

        pthread_mutex_lock(&mutex);
        if (mergeCycleFlag != iter) { 
            mergeCycleFlag = iter;
            int mergesInLoop = threads % iter== 0 ? threads/iter: threads/iter+1;
            pthread_barrier_init(&mergeBarrier[iter], NULL, mergesInLoop);
        }
        pthread_mutex_unlock(&mutex);

        ... MERGE ...

        // Wait everyone finnish merging
        pthread_barrier_wait (&mergeBarrier[iter]);

请注意,我使用lock 创建屏障,因为我们不希望两个线程同时在这里搞乱。如果没有为此iter 设置屏障,我们将创建一个具有现在应该工作的线程数的屏障。另外,我已经更改了您的 breakpoint 语句,以适合计算我们期望执行 merge 的线程数。

经过一些调整,您的mergeThreadResults 应该是这样的:

void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
    
    int nextThread, nextThreadRight;
    int groupSize = 2;

    while (groupSize <= threads) {
        if (myRank % groupSize != 0) { // Release threads that no long perform merges
            break;
        }

        nextThread = (myRank+1*groupSize) < threads ? (myRank+1*groupSize) : threads;
        nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
 
        printf("Merging threads %ld to %d\n", myRank, nextThread-1);

        // Init barrier with number of threads you will wait merging 
        pthread_mutex_lock(&mutex);  // Just one thread can set the barrier
        if (mergeCycleFlag != groupSize) { 
            mergeCycleFlag = groupSize;
            int mergesInLoop = threads % groupSize == 0 ? threads/groupSize : threads/groupSize+1; // Calculate threads working in this step
            pthread_barrier_init(&mergeBarrier[groupSize], NULL, mergesInLoop);  // set barrier
        }
        pthread_mutex_unlock(&mutex);

        // Merge thread group with neighbour group
        merge(sortingArray, myLeft, myRight, nextThreadRight);

        // Wait everyone finnish merging
        pthread_barrier_wait (&mergeBarrier[groupSize]);

        myRight = nextThreadRight;
        groupSize = groupSize * 2;
    }

    // Merge thread 0
    if (myRank == 0 && nextThread < threads-1) {
        nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
    }
}

最后,要使此解决方案发挥作用,您需要每个线程在合并结果之前都已完成其工作。因此,您需要在 main 中的 join 之后调用它,或者在调用 sortManager 上的 mergeThreadResults 之前使用所有线程实现另一个屏障。

此外,更好的方法是让线程只等待它们将合并的其他线程。就像,线程 0 只等待 1。然后等待 2……等等。

【讨论】:

  • 成功了,谢谢!我刚刚将mergeBarrier[groupSize] 更改为mergeBarrier[counter]。并为每次迭代实现一个计数器,以便为屏障分配足够的内存。
【解决方案2】:

我正在尝试并行化合并排序算法。我正在做的是 划分每个线程的输入数组,然后合并线程 结果。

好的,但你的方法是不必要的困难。在合并过程的每一步,您希望一半线程等待另一半完成完成,一个线程等待另一个完成的最自然的方法是使用pthread_join() .如果您希望所有线程在同步后继续进行更多工作,那将是不同的,但在这种情况下,那些不负责任何更多合并的线程根本无事可做。

这是我尝试过的:

        pthread_mutex_lock(&mutex);
        counter++;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
        if(counter >= breakpoint ) {
            counter = 0;
            pthread_cond_broadcast(&cond_var);
        } else {
            while (pthread_cond_wait(&cond_var, &mutex) != 0);
        }
        pthread_mutex_unlock(&mutex);

这有几个问题,但最大的问题是障碍是工作的错误工具。在障碍物达到顶峰后,所有被阻止的线程都会继续进行。您希望 一半 线程继续进行,执行合并,但其他线程(应该)没有更多工作要做。您对breakpoint 的计算假设下半场不会返回障碍,而他们确实不应该这样做。如果你坚持使用屏障,那么没有合并的线程应该在通过屏障后终止。

此外,从 2 开始 iter 是不正确的。如果您使用屏障方法,那么 所有 在每次迭代中活动的线程必须在任何继续之前到达屏障,但如果 iter从 2 开始,然后在第一次迭代中,只有一半的线程必须到达屏障才能通过。

此外,您的简历使用不习惯,容易出现问题。 pthread_cond_wait() 的任何记录失败原因都无法通过尝试再次等待来挽救,因此您可能需要在出错时终止程序。另请注意,pthread_mutex_lock()pthread_mutex_unlock()pthread_cond_broadcast() 也都可能失败。

另一方面,CV 容易受到(非常罕见的)虚假唤醒的影响,因此从等待成功返回后,您需要在继续之前再次检查条件,并可能再次等待。更像是这样的:

        if (pthread_mutex_lock(&mutex) != 0) {
            perror("pthread_mutex_lock");
            abort();
        }
        counter++;
        int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
        if(counter >= breakpoint ) {
            counter = 0;
            if (pthread_cond_broadcast(&cond_var) != 0) {
                perror("pthread_cond_broadcast");
                abort();
            }
        } else {
            do {
                if (pthread_cond_wait(&cond_var, &mutex) != 0) {
                    perror("pthread_cond_wait");
                    abort();
                }
            } while (counter < breakpoint);
        }
        if (pthread_mutex_unlock(&mutex) != 0) {
            perror("pthread_mutex_unlock");
            abort();
        }

        // some threads must terminate at this point

【讨论】:

  • 感谢您的回答。我同意,这最终变得不必要地困难,所以我愿意接受更干净的建议,这些建议完全放弃我迄今为止所做的事情。另外,请问'CV'是什么意思?
  • @Artotim,CV = 条件变量。我已经提到了一种更简洁的方法:在线程间合并的每个阶段,负责合并的每个线程加入负责排序另一半合并输入的线程,以及不负责排序的线程。任何更多的合并终止。然后主线程只加入最后一个剩余的线程,或者即使它本身也参与排序和合并,也不会加入。不需要其他同步。但是,是的,从头开始可能是实现目标的最佳策略。
猜你喜欢
  • 2021-12-02
  • 1970-01-01
  • 1970-01-01
  • 2023-04-03
  • 2015-01-04
  • 2015-03-09
  • 2011-12-28
  • 2013-11-30
  • 1970-01-01
相关资源
最近更新 更多