我会做得更简单,使用简单的多生产者、单一消费者方法。
假设每个数据项都可以用一个数值来描述:
struct value {
struct value *next; /* Forming a singly-linked list of data items */
struct sensor *from; /* Identifies which sensor value this is */
struct timespec when; /* Time of sensor reading in UTC */
double value; /* Numerical value */
};
我将使用两个值列表:一个用于接收但未存储的传感器读数,另一个用于未使用的值桶。这样你就不需要动态分配或释放价值桶,除非你想(通过操纵未使用的列表)。
两个列表都受互斥体保护。由于未使用列表可能为空,我们需要一个条件变量(每当向其中添加新的未使用值时都会发出信号),以便线程可以等待一个可用。接收到的列表同样需要一个条件变量,这样如果消费者(数据存储者)想要它们时它恰好是空的,它可以等待至少一个出现。
static pthread_mutex_t unused_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t unused_wait = PTHREAD_COND_INITIALIZER;
static struct value *unused_list = NULL;
static pthread_mutex_t received_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t received_wait = PTHREAD_COND_INITIALIZER;
static struct value *received_list = NULL;
对于未使用的列表,我们需要三个助手:一个从头开始创建新的未使用的值项(您最初调用它是为了为每个传感器创建两个或三个值项,再加上一些),然后,如果您认为您需要它们(例如,如果您添加新的传感器运行时间):
int unused_create(void)
{
struct value *v;
v = malloc(sizeof *v);
if (!v)
return ENOMEM;
v->from = NULL;
pthread_mutex_lock(&unused_lock);
v->next = unused_list;
unused_list = v;
pthread_cond_signal(&unused_wait);
pthread_mutex_unlock(&unused_lock);
return 0;
}
另外两个用于从列表中获取/放回价值项:
struct value *unused_get(void)
{
struct value *v;
pthread_mutex_lock(&unused_lock);
while (!unused_list)
pthread_cond_wait(&unused_wait, &unused_lock);
v = unused_list;
unused_list = unused_list->next;
pthread_mutex_unlock(&unused_lock);
v->from = NULL;
return v;
}
void unused_put(struct value *v)
{
v->from = NULL;
pthread_mutex_lock(&unused_lock);
v->next = unused_list;
unused_list = v;
pthread_cond_signal(&unused_wait);
pthread_mutex_unlock(&unused_lock);
}
上面的想法是,当from 成员为NULL 时,该项目未被使用(因为它不是来自任何传感器)。从技术上讲,我们不需要在每个阶段都将其清除为 NULL,但我喜欢彻底:它不像设置它是一项昂贵的操作。
传感器访问生产者获取传感器读数,使用例如获取当前时间clock_gettime(CLOCK_REALTIME, &timespec),然后使用unused_get() 抓取一个新的未使用项目。 (顺序很重要,因为unused_get() 可能需要一些时间,如果没有免费物品。)然后,他们填写字段,并调用以下received_put() 将阅读添加到列表中:
void received_put(struct value *v)
{
pthread_mutex_lock(&received_lock);
v->next = received_list;
received_list = v;
pthread_mutex_signal(&received_wait);
pthread_mutex_unlock(&received_lock);
}
只有一个线程定期收集所有接收到的传感器读数并存储它们。它可以保存一组最近的读数,并定期发送。与其重复调用一些 received_get() 直到没有更多尚未处理的接收值,我们应该使用一个返回它们的整个列表的函数:
struct value *received_getall(void)
{
struct value *v;
pthread_mutex_lock(&received_lock);
while (!received_list)
pthread_cond_wait(&received_wait, &received_lock);
v = received_list;
received_list = NULL;
pthread_mutex_unlock(&received_lock);
return v;
}
消费者线程,存储/发送摘要和读数,应该获取整个列表,然后一一处理。处理完每个项目后,应将它们添加到未使用列表中。换句话说,像
struct value *all, v;
while (1) {
all = receive_getall();
while (all) {
v = all;
all = all->next;
v->next = NULL;
/* Store/summarize value item v */
unused_put(v);
}
}
如您所见,当消费者线程处理传感器值项时,传感器线程可以为下一轮添加新读数,只要有足够的空闲值项桶可供使用。
当然,您也可以在一次 malloc() 调用中分配大量值,但是您必须以某种方式记住每个值属于哪个值池才能释放它们。所以:
struct owner {
size_t size; /* Number of value's */
size_t used; /* Number of value's not freed yet */
struct value value[];
};
struct value {
struct value *next; /* Forming a singly-linked list of data items */
struct owner *owner; /* Part of which value array, NULL if standalone */
struct sensor *from; /* Identifies which sensor value this is */
struct timespec when; /* Time of sensor reading in UTC */
double value; /* Numerical value */
};
int unused_add_array(const size_t size)
{
struct owner *o;
struct value *v;
size_t i;
o = malloc(sizeof (struct owner) + size * sizeof (struct value));
if (!o)
return ENOMEM;
o->size = size;
o->used = used;
i = size - 1;
pthread_mutex_lock(&unused_lock);
o->value[i].next = unused_list;
while (i-->0)
o->value[i].next = o->value + i + 1;
unused_list = o->value[0];
pthread_cond_broadcast(&unused_wait);
pthread_mutex_unlock(&unused_lock);
return 0;
}
/* Instead of unused_put(), call unused_free() to discard a value */
void unused_free(struct value *v)
{
pthread_mutex_lock(&unused_lock);
v->from = NULL;
if (v->owner) {
if (v->owner->used > 1) {
v->owner->used--;
return;
}
v->owner->size = 0;
v->owner->used = 0;
free(v->owner);
return;
}
free(v);
return;
}
unused_free() 使用unused_lock 的原因是我们必须确保在释放存储桶时没有其他线程正在访问它。否则,我们可以有一个竞态窗口,其他线程可以在我们 free()d 后使用该值。
请记住,Linux C 库与大多数其他 C 库一样,不会在 free() 处将动态分配的内存返回给操作系统;只有当它足够大时才会返回内存。 (目前在 x86 和 x86-64 上,glibc 的限制约为 132,000 字节左右;任何较小的都留在进程堆中,用于满足未来的 malloc()/calloc()/realloc() 调用。)
struct sensor 的内容由你决定,但我个人认为至少
struct sensor {
pthread_t worker;
int connfd; /* Device or socket descriptor */
const char *name; /* Some kind of identifier, perhaps header in CSV */
const char *units; /* Optional, could be useful */
};
加上可能的传感器读取间隔(例如,毫秒)。
实际上,因为只有一个消费者线程,所以我会使用主线程。