【问题标题】:Thread safe bit array?线程安全位数组?
【发布时间】:2014-02-25 06:38:26
【问题描述】:

编辑问题:是否可以对位数组进行线程安全访问?我下面的实现似乎需要互斥锁,这违背了并行化的目的。

我的任务是使用 pthreads 创建双素数生成器的并行实现。我决定使用埃拉托色尼筛法,并划分标记已知素数因子的工作。我对线程获得的因素感到震惊。

例如,如果有 4 个线程: 线程一标记倍数 3, 11, 19, 27... 螺纹两个标记的倍数 5, 13, 21, 29... 螺纹两个标记的倍数 7, 15, 23, 31... 线程两个标记的倍数 9, 17, 25, 33...

我跳过了偶数和偶数基数。我使用了一个位数组,所以我将它运行到 INT_MAX。我遇到的问题是最大值为 1000 万,结果变化了大约 5 个数字,这是与已知文件相比有多少错误。结果一直变化到大约 10000 的最大值,其中它改变了 1 个数字。低于此值的任何内容均无错误。

起初我并不认为进程之间需要通信。当我看到结果时,我添加了一个 pthread 屏障,让所有线程在每组倍数之后都赶上。这并没有带来任何改变。 在 mark() 函数周围添加一个互斥锁可以解决问题,但这会减慢一切。

这是我的代码。希望有人能看到明显的东西。

#include <pthread.h>
#include <stdio.h>
#include <sys/times.h>
#include <stdlib.h>
#include <unistd.h>
#include <math.h>
#include <string.h>
#include <limits.h>
#include <getopt.h>

#define WORDSIZE 32

struct t_data{
    int *ba;
    unsigned int val;
    int num_threads;
    int thread_id;
};  

pthread_mutex_t mutex_mark;

void  mark( int *ba, unsigned int k )
{
    ba[k/32] |= 1 << (k%32); 
}

void  mark( int *ba, unsigned int k )
{
    pthread_mutex_lock(&mutex_mark);
    ba[k/32] |= 1 << (k%32); 
    pthread_mutex_unlock(&mutex_mark);
}

void initBa(int **ba, unsigned int val)
{
    *ba = calloc((val/WORDSIZE)+1, sizeof(int));
}

void getPrimes(int *ba, unsigned int val)
{
    int i, p;
    p = -1;

    for(i = 3; i<=val; i+=2){
            if(!isMarked(ba, i)){
                    if(++p == 8){
                        printf(" \n");
                        p = 0;
                    }
                    printf("%9d", i);
            }   
    }
    printf("\n");
}

void markTwins(int *ba, unsigned int val)
{
    int i;
    for(i=3; i<=val; i+=2){
        if(!isMarked(ba, i)){
            if(isMarked(ba, i+2)){
                mark(ba, i);
            }

        }
    }
}

void *setPrimes(void *arg)
{
    int *ba, thread_id, num_threads, status;
    unsigned int val, i, p, start;
    struct t_data *data = (struct t_data*)arg;
    ba = data->ba;
    thread_id = data->thread_id;
    num_threads = data->num_threads;
    val = data->val;

    start = (2*(thread_id+2))-1; // stagger threads

    i=3; 
    for(i=3; i<=sqrt(val); i+=2){ 
        if(!isMarked(ba, i)){
            p=start;
            while(i*p <= val){
                    mark(ba, (i*p));
                p += (2*num_threads); 
            }       
        }       
    }
    return 0;
}

void usage(char *filename)
{
    printf("Usage: \t%s [option] [arg]\n", filename);
    printf("\t-q generate #'s internally only\n");
    printf("\t-m [size] maximum size twin prime to calculate\n");
    printf("\t-c [threads] number of threads\n");
    printf("Defaults:\n\toutput results\n\tsize = INT_MAX\n\tthreads = 1\n");
} 

int main(int argc, char **argv)
{
    int *ba, i, num_threads, opt, output;
    unsigned int val;

    output = 1;
    num_threads = 1;
    val = INT_MAX;  

    while ((opt = getopt(argc, argv, "qm:c:")) != -1){
        switch (opt){
            case 'q': output = 0;
                break;
            case 'm': val = atoi(optarg);
                break;
            case 'c': num_threads = atoi(optarg);
                break;
            default: 
                usage(argv[0]);
                exit(EXIT_FAILURE);
        }
    }

    struct t_data data[num_threads];    
    pthread_t thread[num_threads];
    pthread_attr_t attr;

    pthread_mutex_init(&mutex_mark, NULL);

    initBa(&ba, val);   

    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);    

    for(i=0; i < num_threads; i++){
        data[i].ba = ba;
        data[i].thread_id = i;
        data[i].num_threads = num_threads;
        data[i].val = val;
        if(0 != pthread_create(&thread[i],
                                &attr,
                                setPrimes,
                                (void*)&data[i])){
            perror("Cannot create thread");
            exit(EXIT_FAILURE);
        }
    }

    for(i = 0; i < num_threads; i++){
        pthread_join(thread[i], NULL);
    }

    markTwins(ba, val);
    if(output)
        getPrimes(ba, val); 

    free(ba);
    return 0;
}

编辑:我摆脱了障碍,并在标记函数中添加了一个 mutex_lock。输出现在是准确的,但现在不止一个线程会减慢它的速度。有什么加快速度的建议吗?

【问题讨论】:

  • 一些处理器具有设置/重置指令,可以在一个原子操作中将位掩码应用于内存位置。你不妨检查一下你的指令集。

标签: c synchronization pthreads primes


【解决方案1】:

您当前的标记实现是正确的,但是锁定的粒度非常粗——整个数组只有一个锁定。这意味着您的线程一直在争夺该锁。

提高性能的一种方法是使锁更细粒度:每个“标记”操作只需要对数组中的单个整数进行独占访问,因此您可以为每个数组条目设置一个互斥体:

struct bitarray
{
    int *bits;
    pthread_mutex_t *locks;
};

struct t_data
{
    struct bitarray ba;
    unsigned int val;
    int num_threads;
    int thread_id;
};

void initBa(struct bitarray *ba, unsigned int val)
{
    const size_t array_size = val / WORDSIZE + 1;
    size_t i;

    ba->bits = calloc(array_size, sizeof ba->bits[0]);
    ba->locks = calloc(array_size, sizeof ba->locks[0]);

    for (i = 0; i < array_size; i++)
    {
        pthread_mutex_init(&ba->locks[i], NULL);
    }
}

void mark(struct bitarray ba, unsigned int k)
{
    const unsigned int entry = k / 32;

    pthread_mutex_lock(&ba.locks[entry]);
    ba.bits[entry] |= 1 << (k%32); 
    pthread_mutex_unlock(&ba.locks[entry]);
}

请注意,您的算法存在竞争条件:考虑num_threads = 4 的示例,因此线程 0 从 3 开始,线程 1 从 5 开始,线程 2 从 7 开始。线程 2 可以完全执行,标记每个 7 的倍数,然后从 15 重新开始,之前线程 0 或线程 1 有机会将 15 标记为 3 或 5 的倍数。然后线程 2 将做无用的工作,标记每个倍数15 个。


如果您的编译器支持 Intel 风格的原子内置函数,另一种选择是使用它们而不是锁:

void mark(int *ba, unsigned int k)
{
    __sync_or_and_fetch(&ba[k/32], 1U << k % 32); 
}

【讨论】:

  • 哇,我真的可以使用 UINT_MAX/32 互斥变量数组吗?你知道它们占用了多少空间吗? (对不起,我提到将它运行到 INT_MAX,但我的意思是 UINT_MAX)
  • 关于比赛条件。所有线程都使用相同的当前素数。它是在进程之间拆分的倍数。我不是很清楚。例如对于两个进程,如果当前已知素数为 3,则进程 1 标记 3*3、3*11、3*19、3*27...,进程 2 标记 3*5、3*13、3*21 , 3*29...等等。我现在想知道这是否会破坏锁定每个元素的目的,因为它们都会重叠很多。将 2-sqrt(max) 的所有素数预先确定,然后错开或分块分隔已知素数可能会更好?
  • @MatthewTanner:大小取决于您的实现(例如在 Linux x86 glibc 上,pthread_mutex_t 是 24 字节)。您当然可以选择您喜欢的任何粒度,从每个数组条目一个互斥锁到整个数组一个互斥锁 - 在计算互斥锁索引和互斥锁数组大小时,只需除以WORDSIZE * N 而不是WORDSIZE。如果您将素数预先确定为 sqrt(max),那么您可能只需为每个线程提供自己的私有位数组(假设您有足够的内存)并在最后组合位数组,根本不需要锁定。跨度>
  • 知道了,谢谢。我已经将素数预定到 sqrt(max)。我将尽可能地分离进程,可能通过将预定的素数分成块,然后使用锁的数量。我想到了单独的位数组,但这肯定会限制我的线程数量。
  • @MatthewTanner:如果你的编译器支持 Intel 风格的 atomic 内置函数,就像 gcc 那样,还有另一种选择。请参阅更新的答案。但是,如果您的线程都在共享内存的同一区域周围跺脚,您仍然会得到很多缓存线乒乓,因此使它们分散开的替代方案仍然会表现得更好。
【解决方案2】:

您的 mark() 函数不是线程安全的 - 如果两个线程尝试在同一 int 位置设置位,则可能会用另一个线程刚刚设置的位覆盖 0。

【讨论】:

  • 我摆脱了障碍,并在标记函数中添加了一个 mutex_lock。它是准确的,但现在每增加一个线程就会变慢。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-23
  • 1970-01-01
  • 1970-01-01
  • 2013-05-30
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多