在增加写入索引和分配入口指针之间存在竞争条件。
考虑生产者 A 增加写入索引但用完其时间片的情况。同时,生产者 B 再次增加写入索引,填充 next 条目——记住,A 还没有填充它的条目——并增加信号量。现在,如果消费者 C 在 A 之前被唤醒,它完全有理由相信 A 已经填充了它的条目,并抓住了它。因为还没有填充,所以为NULL。
换句话说:
Producer A Producer B Consumer C
write_pos++
write_pos++
sets buffer[]
sem_post()
sem_wait()
read_pos++
uses buffer[]
sets buffer[]
sem_post()
sem_wait()
read_pos++
uses buffer[]
你的生产者越多,你看到上述场景的概率就越高。
解决方案很简单:您添加一个write_pos2 计数器,该计数器对写入器进行序列化,以便它们以正确的顺序发布信号量。
考虑以下示例程序:
#define _POSIX_C_SOURCE 200809L
#include <unistd.h>
#include <stdint.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <string.h>
#include <errno.h>
#include <stdio.h>
typedef struct {
sem_t semaphore;
uint64_t size;
volatile uint64_t wrnext; /* Next free write slot */
volatile uint64_t wrindex; /* Write index, second half */
volatile uint64_t rdindex; /* Read index */
void *entry[];
} cbuffer;
static cbuffer *cbuffer_destroy(cbuffer *const cbuf)
{
if (cbuf) {
cbuf->size = 0;
cbuf->wrnext = 0;
cbuf->wrindex = 0;
cbuf->rdindex = 0;
sem_destroy(&cbuf->semaphore);
free(cbuf);
}
return NULL;
}
static cbuffer *cbuffer_create(const size_t size)
{
cbuffer *cbuf;
if (size < 2) {
errno = EINVAL;
return NULL;
}
cbuf = malloc(sizeof *cbuf + size * sizeof cbuf->entry[0]);
if (!cbuf) {
errno = ENOMEM;
return NULL;
}
memset(cbuf->entry, 0, size * sizeof cbuf->entry[0]);
sem_init(&cbuf->semaphore, 0, 0);
cbuf->size = size;
cbuf->wrnext = 0;
cbuf->wrindex = 0;
cbuf->rdindex = 0;
return cbuf;
}
static void cbuffer_add(cbuffer *const cbuf, void *const entry)
{
uint64_t wrnext;
/* Get next nose value. */
wrnext = __sync_fetch_and_add(&cbuf->wrnext, (uint64_t)1);
/* Spin while buffer full. */
while (!__sync_bool_compare_and_swap(&cbuf->entry[wrnext % cbuf->size], NULL, entry))
;
/* Spin until we can update the head to match next. */
while (!__sync_bool_compare_and_swap(&cbuf->wrindex, wrnext, wrnext + (uint64_t)1))
;
/* TODO: check for -1 and errno == EOVERFLOW */
sem_post(&cbuf->semaphore);
}
static void *cbuffer_get(cbuffer *const cbuf)
{
uint64_t rdindex;
/* Get the index of the oldest entry. */
rdindex = __sync_fetch_and_add(&cbuf->rdindex, (uint64_t)1);
sem_wait(&cbuf->semaphore);
/* Pop entry. */
return __sync_fetch_and_and(&cbuf->entry[rdindex % cbuf->size], NULL);
}
static volatile int done = 0;
static cbuffer *cb = NULL;
void *consumer_thread(void *payload)
{
const long id = (long)payload;
unsigned long count = 0UL;
void *entry;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
while (1) {
if (done)
return NULL;
entry = cbuffer_get(cb);
count++;
if (!entry) {
printf("Consumer %ld: NULL pointer at %lu encountered!\n", id, count);
fflush(stderr);
done = 1;
return NULL;
}
}
}
void *producer_thread(void *payload __attribute__((unused)))
{
unsigned long count = 0UL;
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
while (1) {
if (done)
return NULL;
cbuffer_add(cb, (void *)(256UL + (count & 255UL)));
}
}
int main(int argc, char *argv[])
{
pthread_attr_t attrs;
pthread_t *producer_id;
pthread_t *consumer_id;
sigset_t blocked;
siginfo_t info;
struct timespec timeout;
int producers, consumers, size, i, result;
char dummy;
if (argc != 4 || !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
fprintf(stderr, "\n");
fprintf(stderr, "Usage: %s [ -h | --help ]\n", argv[0]);
fprintf(stderr, " %s SIZE PRODUCERS CONSUMERS\n", argv[0]);
fprintf(stderr, "\n");
return 1;
}
if (sscanf(argv[1], " %d %c", &size, &dummy) != 1 || size < 2) {
fprintf(stderr, "%s: Invalid circular buffer size.\n", argv[1]);
return 1;
}
if (sscanf(argv[2], " %d %c", &producers, &dummy) != 1 || producers < 1) {
fprintf(stderr, "%s: Invalid number of producer threads.\n", argv[2]);
return 1;
}
if (sscanf(argv[3], " %d %c", &consumers, &dummy) != 1 || consumers < 1) {
fprintf(stderr, "%s: Invalid number of consumer threads.\n", argv[3]);
return 1;
}
cb = cbuffer_create(size);
producer_id = malloc((size_t)producers * sizeof *producer_id);
consumer_id = malloc((size_t)consumers * sizeof *consumer_id);
if (!cb || !producer_id || !consumer_id) {
fprintf(stderr, "%s.\n", strerror(ENOMEM));
return 1;
}
sigemptyset(&blocked);
sigaddset(&blocked, SIGINT);
sigaddset(&blocked, SIGTERM);
sigprocmask(SIG_BLOCK, &blocked, NULL);
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, 32768);
/* Start consumer threads. */
for (i = 0; i < consumers; i++) {
result = pthread_create(&consumer_id[i], &attrs, consumer_thread, (void *)(1L + (long)i));
if (result) {
fprintf(stderr, "Cannot start consumer threads: %s.\n", strerror(result));
exit(1);
}
}
/* Start producer threads. */
for (i = 0; i < producers; i++) {
result = pthread_create(&producer_id[i], &attrs, producer_thread, (void *)(1L + (long)i));
if (result) {
fprintf(stderr, "Cannot start producer threads: %s.\n", strerror(result));
exit(1);
}
}
pthread_attr_destroy(&attrs);
printf("Press CTRL+C or send SIGTERM to process %ld to stop testing.\n", (long)getpid());
fflush(stdout);
while (1) {
if (done)
break;
timeout.tv_sec = (time_t)0;
timeout.tv_nsec = 10000000L; /* 0.010000000 seconds */
result = sigtimedwait(&blocked, &info, &timeout);
if (result != -1 || errno != EAGAIN) {
done = 1;
break;
}
}
printf("Exiting...\n");
fflush(stdout);
for (i = 0; i < producers; i++)
pthread_cancel(producer_id[i]);
for (i = 0; i < consumers; i++)
pthread_cancel(consumer_id[i]);
for (i = 0; i < producers; i++)
pthread_join(producer_id[i], NULL);
for (i = 0; i < consumers; i++)
pthread_join(consumer_id[i], NULL);
cb = cbuffer_destroy(cb);
free(producer_id);
free(consumer_id);
return 0;
}
虽然我可能对此有误,但我可以使用任意数量的生产者(显然只有一个消费者)运行上述程序,而不会遇到 NULL 指针。您可以轻松添加一些逻辑来验证指针。
我相信即使在没有争议的情况下,您也经常旋转。
我个人会考虑使用两个链表来代替:一个用于未使用/空闲的插槽,另一个用于添加的条目。 (如果您指向的条目以 next 指针字段开头,那么您只需要 used 列表。我自己更喜欢这个。)
生产者总是从空闲列表中获取第一个节点,并将其添加到已使用列表中。消费者获取整个使用列表。所有这些操作都使用简单的do { } while (!__sync_bool_compare_and_swap()); 循环,或者对于 GCC 4.7 及更高版本使用do { } while (!__atomic_compare_exchange());,循环只执行一次。类似于以下的东西 -- untested -- 代码:
struct node {
struct node *next;
/* whatever data here */
};
void add_one(volatile struct node **const list, struct node *item)
{
do {
item->next = (*list) ? (*list)->next : NULL;
} while (!__sync_bool_compare_and_swap(list, item->next, item);
}
struct node *get_one(volatile struct node **const list)
{
struct node *first, *next;
do {
first = *list;
next = (first) ? first->next : NULL;
} while (!__sync_bool_compare_and_swap(list, first, next);
if (first)
first->next = NULL;
return first;
}
struct node *get_all(volatile struct node **const list)
{
struct node *all, *root;
do {
all = *list;
} while (!__sync_bool_compare_and_swap(list, all, NULL));
root = NULL;
while (all) {
struct node *const curr = all;
all = all->next;
curr->next = root;
root = curr;
}
return root;
}
请注意,上面的get_all() 反转了列表,因此最旧的条目在返回的列表中排在第一位。这使消费者可以轻松地按照添加顺序处理所有条目,而在常见情况下开销最小。
问题?