【问题标题】:Organize multitrheading datalogging [closed]组织多线程数据记录[关闭]
【发布时间】:2021-03-01 23:50:57
【问题描述】:

我正在为运行 Linux 发行版(使用 Yocto 构建)的嵌入式目标开发 C 项目。我是 Linux 嵌入式世界的新手,我必须构建一个数据记录系统。

我已经在学习如何使用线程,并且正在考虑项目组织。

这是我的想象:

  • 多个线程从不同的接口、CAN 总线、I2C...(不同的采样率)收集数据
  • 一个线程以 200 毫秒的采样率填充 csv 文件
  • 一个线程以 3 秒的采样率通过 http 请求发送数据
  • 线程将在 CAN 信息或外部事件上停止

我不知道组织这个项目的最佳方式是什么。我看到了两种方法,第一种是启动程序创建每个线程并在一段时间循环中等待事件监视以停止它们。第二种方式是启动程序将其他二进制文件作为线程执行。 这两种方式我不知道线程之间如何共享数据。

你能分享一下你的经验吗?

谢谢

编辑: 首先,非常感谢@Glärbo 的解释。学习多线程机制真的很有帮助。

我已经测试成功了。

对于未来的读者,我绘制了图表来说明@Glärbo 的答案。

main thread

productor-sensor thread

datalogger thread

【问题讨论】:

    标签: c linux multithreading code-organization


    【解决方案1】:

    我会做得更简单,使用简单的多生产者、单一消费者方法。

    假设每个数据项都可以用一个数值来描述:

    struct value {
        struct value   *next;  /* Forming a singly-linked list of data items */
        struct sensor  *from;  /* Identifies which sensor value this is */
        struct timespec when;  /* Time of sensor reading in UTC */
        double          value; /* Numerical value */
    };
    

    我将使用两个值列表:一个用于接收但未存储的传感器读数,另一个用于未使用的值桶。这样你就不需要动态分配或释放价值桶,除非你想(通过操纵未使用的列表)。

    两个列表都受互斥体保护。由于未使用列表可能为空,我们需要一个条件变量(每当向其中添加新的未使用值时都会发出信号),以便线程可以等待一个可用。接收到的列表同样需要一个条件变量,这样如果消费者(数据存储者)想要它们时它恰好是空的,它可以等待至少一个出现。

    static pthread_mutex_t  unused_lock = PTHREAD_MUTEX_INITIALIZER;
    static pthread_cond_t   unused_wait = PTHREAD_COND_INITIALIZER;
    static struct value    *unused_list = NULL;
    
    static pthread_mutex_t  received_lock = PTHREAD_MUTEX_INITIALIZER;
    static pthread_cond_t   received_wait = PTHREAD_COND_INITIALIZER;
    static struct value    *received_list = NULL;
    

    对于未使用的列表,我们需要三个助手:一个从头开始创建新的未使用的值项(您最初调用它是为了为每个传感器创建两个或三个值项,再加上一些),然后,如果您认为您需要它们(例如,如果您添加新的传感器运行时间):

    int unused_create(void)
    {
        struct value *v;
    
        v = malloc(sizeof *v);
        if (!v)
            return ENOMEM;
    
        v->from = NULL;
    
        pthread_mutex_lock(&unused_lock);
        v->next = unused_list;
        unused_list = v;
        pthread_cond_signal(&unused_wait);
        pthread_mutex_unlock(&unused_lock);
    
        return 0;
    }
    

    另外两个用于从列表中获取/放回价值项:

    struct value *unused_get(void)
    {
        struct value *v;
    
        pthread_mutex_lock(&unused_lock);
        while (!unused_list)
            pthread_cond_wait(&unused_wait, &unused_lock);
        v = unused_list;
        unused_list = unused_list->next;
        pthread_mutex_unlock(&unused_lock);
    
        v->from = NULL;
    
        return v;
    }
    
    void unused_put(struct value *v)
    {
        v->from = NULL;
    
        pthread_mutex_lock(&unused_lock);
        v->next = unused_list;
        unused_list = v;
        pthread_cond_signal(&unused_wait);
        pthread_mutex_unlock(&unused_lock);
    }
    

    上面的想法是,当from 成员为NULL 时,该项目未被使用(因为它不是来自任何传感器)。从技术上讲,我们不需要在每个阶段都将其清除为 NULL,但我喜欢彻底:它不像设置它是一项昂贵的操作。

    传感器访问生产者获取传感器读数,使用例如获取当前时间clock_gettime(CLOCK_REALTIME, &timespec),然后使用unused_get() 抓取一个新的未使用项目。 (顺序很重要,因为unused_get() 可能需要一些时间,如果没有免费物品。)然后,他们填写字段,并调用以下received_put() 将阅读添加到列表中:

    void received_put(struct value *v)
    {
        pthread_mutex_lock(&received_lock);
        v->next = received_list;
        received_list = v;
        pthread_mutex_signal(&received_wait);
        pthread_mutex_unlock(&received_lock);
    }
    

    只有一个线程定期收集所有接收到的传感器读数并存储它们。它可以保存一组最近的读数,并定期发送。与其重复调用一些 received_get() 直到没有更多尚未处理的接收值,我们应该使用一个返回它们的整个列表的函数:

    struct value *received_getall(void)
    {
        struct value *v;
        pthread_mutex_lock(&received_lock);
        while (!received_list)
            pthread_cond_wait(&received_wait, &received_lock);
        v = received_list;
        received_list = NULL;
        pthread_mutex_unlock(&received_lock);
        return v;
    }
    

    消费者线程,存储/发送摘要和读数,应该获取整个列表,然后一一处理。处理完每个项目后,应将它们添加到未使用列表中。换句话说,像

        struct value *all, v;
    
        while (1) {
            all = receive_getall();
            while (all) {
                v = all;
                all = all->next;
                v->next = NULL;
    
                /* Store/summarize value item v */
    
                unused_put(v);
            }
        }
    

    如您所见,当消费者线程处理传感器值项时,传感器线程可以为下一轮添加新读数,只要有足够的空闲值项桶可供使用。

    当然,您也可以在一次 malloc() 调用中分配大量值,但是您必须以某种方式记住每个值属于哪个值池才能释放它们。所以:

    struct owner {
        size_t          size;    /* Number of value's */
        size_t          used;    /* Number of value's not freed yet */
        struct value    value[];
    };
    struct value {
        struct value   *next;  /* Forming a singly-linked list of data items */
        struct owner   *owner; /* Part of which value array, NULL if standalone */
        struct sensor  *from;  /* Identifies which sensor value this is */
        struct timespec when;  /* Time of sensor reading in UTC */
        double          value; /* Numerical value */
    };
    
    int unused_add_array(const size_t size)
    {
        struct owner *o;
        struct value *v;
        size_t        i;
    
        o = malloc(sizeof (struct owner) + size * sizeof (struct value));
        if (!o)
            return ENOMEM;
    
        o->size = size;
        o->used = used;
        i = size - 1;
    
        pthread_mutex_lock(&unused_lock);
        o->value[i].next = unused_list;
        while (i-->0)
           o->value[i].next = o->value + i + 1;    
        unused_list = o->value[0];
        pthread_cond_broadcast(&unused_wait);
        pthread_mutex_unlock(&unused_lock);
        return 0;
    }
    
    /* Instead of unused_put(), call unused_free() to discard a value */
    void unused_free(struct value *v)
    {
        pthread_mutex_lock(&unused_lock);
        v->from = NULL;
    
        if (v->owner) {
            if (v->owner->used > 1) {
                v->owner->used--;
                return;
            }
            v->owner->size = 0;
            v->owner->used = 0;
            free(v->owner);
            return;
        }
        free(v);
        return;
    }
    

    unused_free() 使用unused_lock 的原因是我们必须确保在释放存储桶时没有其他线程正在访问它。否则,我们可以有一个竞态窗口,其他线程可以在我们 free()d 后使用该值。

    请记住,Linux C 库与大多数其他 C 库一样,不会在 free() 处将动态分配的内存返回给操作系统;只有当它足够大时才会返回内存。 (目前在 x86 和 x86-64 上,glibc 的限制约为 132,000 字节左右;任何较小的都留在进程堆中,用于满足未来的 malloc()/calloc()/realloc() 调用。)

    struct sensor 的内容由你决定,但我个人认为至少

    struct sensor {
        pthread_t   worker;
        int         connfd;  /* Device or socket descriptor */
        const char *name;    /* Some kind of identifier, perhaps header in CSV */
        const char *units;   /* Optional, could be useful */
    };
    

    加上可能的传感器读取间隔(例如,毫秒)。

    实际上,因为只有一个消费者线程,所以我会使用主线程。

    【讨论】:

    • @gilou:这里要注意的关键是,对于单个消费者线程(保存和发送数据),您拥有最简单的结构,新的传感器读数被连续缓冲。您可以为每个“数据转发器”添加一个队列,以便消费者将数据转发到第一个转发器,而不是将项目移动到未使用的列表中,然后转发到下一个,依此类推。这仅在转发器按更新频率降序排列时才有效;我担心它很脆弱。我相信,做所有保存/转发的单一消费者是最强大的。
    • @gilou:这些图表也符合我的想象。我建议您在实际代码中尝试(或单元测试)部分设计,甚至可以使用虚拟传感器读取生产者编写“测试设置”,以查看有多少读取数据包正在传输,如何收集 CSV 数据行(您可能希望使用一个数组,每个传感器有一列,然后更新它);但不是一次全部:一部分一部分。您可能会学到一些改变整体设计的东西。 (我的意思是,这种情况经常发生在我身上:每当我先进行实验时,我往往会做出更好的设计。)
    猜你喜欢
    • 1970-01-01
    • 2011-09-08
    • 2021-06-21
    • 2012-11-05
    • 1970-01-01
    • 2015-05-07
    • 1970-01-01
    • 2011-12-18
    • 1970-01-01
    相关资源
    最近更新 更多