【问题标题】:Filling and saving shared buffer between threads填充和保存线程之间的共享缓冲区
【发布时间】:2019-06-18 14:11:51
【问题描述】:

我正在使用检索 I/Q 数据的 API。调用函数bbGetIQ(m_handle, &pkt);填充缓冲区。这是一个线程循环,而用户没有输入“停止”。 Pkt 是一个结构,使用的缓冲区是pkt.iqData = &m_buffer[0];,它是一个浮点向量。向量的大小是 5000,每次我们循环缓冲区时都会填充 5000 个值。

我想将缓冲区中的数据保存到一个文件中,我是在调用 bbgetIQ 之后立即执行此操作的,但这样做是一项耗时的任务,数据检索速度不够快导致 API删除数据,以便它可以继续填充其缓冲区。

这是我的代码的样子:


void Acquisition::recordIQ(){

    int cpt = 0;
    ofstream myfile;


    while(1){

        while (keep_running)
        {   

            cpt++;

            if(cpt < 2)
                myfile.open ("/media/ssd/IQ_Data.txt");


            bbGetIQ(m_handle, &pkt); //Retrieve I/Q data


            //Writing content of buffer into the file.
            for(int i=0; i<m_buffer.size(); i++)
                myfile << m_buffer[i] << endl;


        }
        cpt = 0;
        myfile.close();
    }
}

然后我尝试只在我们离开循环时写入文件:



void Acquisition::recordIQ(){

    int cpt = 0;
    ofstream myfile;
    int next=0;
    vector<float> data;


    while(1){

        while ( keep_running)
        {   
            if(keep_running == false){

                myfile.open ("/media/ssd/IQ_Data.txt");

                for(int i=0; i<data.size(); i++)
                    myfile << data[i] << endl;

                myfile.close();
                break;
            }

            cpt++;

            data.resize(next + m_buffer.size());

            bbGetIQ(m_handle, &pkt); //retrieve data

            std::copy(m_buffer.begin(), m_buffer.end(), data.begin() + next); //copy content of the buffer into final vector

            next += m_buffer.size(); //next index

        }

        cpt = 0;

    }
}

我不再从 API 中丢失数据,但问题是我受到 data 向量大小的限制。例如,我不能让它整夜检索数据。

我的想法是制作 2 个线程。一个将检索数据,另一个将数据写入文件。两个线程将共享一个循环缓冲区,其中第一个线程将填充缓冲区,第二个线程将读取缓冲区并将内容写入文件。因为它是一个共享缓冲区,我想我应该使用互斥锁。

我是多线程和互斥锁的新手,所以这是个好主意吗?我真的不知道从哪里开始以及消费者线程如何在生产者填充缓冲区时读取缓冲区。读取时锁定缓冲区会导致 API 丢失数据吗? (因为它无法将其写入循环缓冲区)。

编辑:因为我希望我的记录线程在后台运行,这样我就可以在记录时做其他事情,我将其分离,用户可以通过将条件 keep_running 设置为来启动记录真的。


thread t1(&Acquisition::recordIQ, &acq);
t1.detach();

【问题讨论】:

    标签: c++ multithreading mutex circular-buffer


    【解决方案1】:

    你需要使用这样的东西 (https://en.cppreference.com/w/cpp/thread/condition_variable):

    全局变量:

    std::mutex m;
    std::condition_variable cv;
    std::vector<std::vector<float>> datas;
    bool keep_running = true, start_running = false;
    

    写线程:

    void writing_thread()
    {
        myfile.open ("/media/ssd/IQ_Data.txt");
    
        while(1) {
            // Wait until main() sends data
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, []{return keep_running && !datas.empty();});
            if (!keep_running) break;
    
            auto d = std::move(datas); 
            lk.unlock();
    
            for(auto &entry : d) {
                for(auto &e : entry)
                    myfile << e << endl;             
            }
        }
    }
    

    发送线程:

    void sending_thread() {
        while(1) {
            {
                std::unique_lock<std::mutex> lk(m);
                cv.wait(lk, []{return keep_running && start_running;});
                if (!keep_running) break;
            }
    
            bbGetIQ(m_handle, &pkt); //retrieve data
    
            std::vector<float> d = m_buffer;
    
            {
                std::lock_guard<std::mutex> lk(m);
                if (!keep_running) break;
                datas.push_back(std::move(d));
            }
            cv.notify_one();
        }
    }
    void start() {
        {
            std::unique_lock<std::mutex> lk(m);
            start_running = true;
        }
        cv.notify_all();
    }
    void stop() {
        {
            std::unique_lock<std::mutex> lk(m);
            start_running = false;
        }
        cv.notify_all();
    }
    void terminate() {
        {
            std::unique_lock<std::mutex> lk(m);
            keep_running = false;
        }
        cv.notify_all();
    
        thread1.join();
        thread2.join();
    }
    

    简而言之: 发送线程接收来自任何数据的数据,锁定互斥锁mt 并将数据移动到datas 存储。然后它使用cv 条件变量来通知等待线程,有事可做。写入线程等待条件变量发出信号,然后锁定互斥锁mt移动数据从datas全局变量到本地,然后释放互斥锁并继续写入接收到的数据归档。关键是让 mutexed 锁定尽可能短的时间。

    编辑: 要终止整个事情,您需要将 keep_running 设置为 false。 然后致电cv.notify_all()然后加入相关线程。秩序很重要。您需要加入线程,因为写入线程可能仍在写入数据的过程中。

    编辑2: 添加了延迟启动。现在创建两个线程,一次运行sending_thread,另一次运行writing_thread。调用start() 启用处理,调用stop() 停止处理。

    【讨论】:

    • 所以如果我理解得很好,发送线程将永远不会停止?它将继续接收和移动数据,然后通知写入线程,这将是唯一一个等待的线程?我还需要注意keep _running 条件,因为它用于两个线程吗?
    • 设置条件为false,then使用cv.notify_all()then加入线程(等待线程销毁)就完成了。订单很重要。您需要加入线程以等待它们完成,因为写入线程可能仍在写入数据的过程中。
    • 问题是我希望我的发送线程在后台工作,因为我正在并行执行其他操作。这就是为什么当我需要使用keep_runningcondition 检索数据时,我将其分离并“激活”它。
    • 好吧,我开始明白了!这适用于分离的线程吗?因为如果我加入发送线程,它将阻塞主线程,直到发送线程终止,并且在发送线程处于活动状态时我将无法进行其他处理。编辑:您的代码中似乎有括号问题。
    • 不确定,分离线程是什么意思。目前,您可以只创建两个线程并让它们存在。当你准备好调用bbGetIQ时,调用我的函数start(),当你想停止处理时,调用stop()。线程将仍然准备就绪,但正在等待,因此不会消耗任何处理器时间。当您决定退出应用程序时,您必须执行此终止程序。我将添加terminate() 只是为了更好的衡量。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-01
    • 1970-01-01
    • 2011-05-22
    相关资源
    最近更新 更多