【问题标题】:c++ Fast IPC - boost message queue seems slow?c++ Fast IPC - 提升消息队列似乎很慢?
【发布时间】:2015-09-10 09:41:33
【问题描述】:

我有一个我自己似乎无法解决的问题。我有 Process1 在 while 循环中计算数据。这个过程必须尽可能快地执行。我需要在 Process1 中计算的数据以供以后分析,写入文件会很慢。

我从未使用过 IPC,但我认为这是一种将 Process1 中的数据存储在内存中并从另一个对时间要求不高的 Process2(独立程序)访问并将日期写入文件的好方法。

我已经创建了我的小测试程序(用于了解 IPC)所以:

  1. 即使 Process2 不可访问,Process1 也会运行 - 然后它会跳过 IPC 并直接执行
  2. 运行 Process2 时,它将等待 Process1 - 如果 Process1 启动则获取数据,然后写入磁盘。
  3. Process2 将仅在 10 个以下样本中获取 x 数量的数据 (maxRunTime)。

我创建的当前程序非常慢,通过 IPC 发送消息时速度慢了 6 倍。目前我在每个“TimeStep”只传递三个浮点数,但这可能是 100。而 RunTime 可能是 10.000。

待办事项: 如果有人能引导我朝着正确的方向前进,我会很高兴。下面的代码正在运行,可能是运气不好,因为它不漂亮。

我需要找到一个尽可能快的解决方案,但不必是实时的。由于我不是专业程序员,我还需要妥协复杂性,因为我需要了解我在做什么。

希望有人能提供帮助。

代码:

  1. 使用 Boost.1.59 和 MSVC 11.0_x86
  2. 两个独立的程序 - ConsoleApps

流程1:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>


#pragma comment(lib, "user32.lib")

using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock; 


bool InitCreateMsgQ()
{
    bool initOK = false;
    //Create a msgQ for parsing data
    try
    {
        message_queue::remove("msgQData");
        //Create a message_queue.
        message_queue mqData
        (open_or_create     //create q 
        ,"msgQData"         //name
        ,1000000                //max message number
        ,sizeof(float)      //max message size
        );
        initOK = true;
    }
    catch(interprocess_exception &ex)
    {
        return false;
    }
//Create State
    try
    {
        message_queue::remove("msgState");
        //Create a message_queue.
        message_queue mqState
        (open_or_create     //create q 
        ,"msgState"     //name
        ,1                  //max message number
        ,sizeof(int)        //max message size
        );
        initOK = true;
    }
    catch(interprocess_exception &ex)
    {
        return false;
    }
    return initOK;
}
bool SetState(int state)
{
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqState
        (open_only       //only oepn q
        ,"msgState"  //name
        );

        timeout = !mqState.timed_send(&state, sizeof(int), 0, 
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(100));
    }
    catch(interprocess_exception &ex)
    {
        message_queue::remove("msgState");
        timeout = true;
    }
    return timeout;
}
bool SetData(float data)
{
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqData
        (open_only       //only oepn q
        ,"msgQData"  //name
        );

        timeout = !mqData.timed_send(&data, sizeof(float), 0, 
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
        //mqData.send(&data, sizeof(float), 0);
    }
    catch(interprocess_exception &ex)
    {
        message_queue::remove("msgQData");
        timeout = true;
    }
    return timeout;
}

int main ()
{
    time_t start,end;

    int runTime = 0; //just for testing
    int dummyState = 2;
    float x;
    int state = 0;
    if (InitCreateMsgQ()){state = 1;} //If all msQ ok set state 1
    if (SetState(state)){state = 0;}// If timeout to set state go to state 0
    //Do twice to get error if observer is not started
    if (SetState(dummyState)){state = 0;}// Set Dummy state for obersver
                                         // If timeout to set state go to state 0

    time (&start);
    //Runtime!
    while(runTime<1000)
    {
        switch (state) 
        {
            case 0:
                state = 0;//force next state 0 - should not be needed
                //Do nothing and break loop if monitor tool is not ready                
                break;
            case 1:
                state = 1;
                cout << "Try SEND DATA" << endl;
                for (int i = 0; i < 3; i++)
                {
                    x = rand() % 100;
                    if (SetData(x)){state = 0;}
                }               
                break;
            default:
                break;
        }
        runTime++;
        cout << "runTime: " << runTime <<" state: " << state << endl;
    }

    message_queue::remove("msgQData");
    message_queue::remove("msgState");
    cout << "done - state: " << state << endl;

    time (&end);
    double dif = difftime (end,start);
    printf ("Elasped time is %.2lf seconds.", dif );

    getchar();
}

进程2:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>


#pragma comment(lib, "user32.lib")

using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock; 

ofstream debugOut;      // Output file for debug    (DEBUG)

int getState()
{
    int state = 0;
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqState
        (open_only       //only oepn q
        ,"msgState"  //name
        );

        unsigned int priority;
        message_queue::size_type recvd_size;

        timeout = !mqState.try_receive(&state, sizeof(state), recvd_size, priority);    
    }
    catch(interprocess_exception &ex)
    {
        timeout = true;
    }

    if(timeout){state = 0;}

    return state;
}
float getData()
{
    float Data = -123456;
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqData
        (open_only       //only oepn q
        ,"msgQData"  //name
        );

        unsigned int priority;
        message_queue::size_type recvd_size;

        //Receive the data
        //mqData.try_receive(&Data, sizeof(Data), recvd_size, priority);
        timeout = !mqData.timed_receive(&Data, sizeof(Data), recvd_size, priority,
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(10));
    }
    catch(interprocess_exception &ex)
    {
        timeout = true;
    }

    if(timeout){Data = -123456;}

    return Data;
}

int main ()
{
    int state = 0;
    int maxRunTime = 10;
    float Data;
    float DataArray[100000];

    debugOut.open("IPCWriteTest.txt", std::ios::trunc);
    debugOut.close();

    while(true)
    {
        switch (state) 
        {
            case 0: 
                //Do nothing - data not ready state
                if(getState() == 1)
                {
                    state = 1;
                    cout << "State: 1" <<endl;
                } //If all msQ ok set state 1
                else{state = 0;}
                break;
            case 1:
                for (int runTime = 0; runTime < maxRunTime; runTime++)
                {
                    cout << "runTime: " << runTime << " Data: ";
                    for (int i = 0; i < 3; i++)
                    {
                        Data = getData();
                        cout << Data << "   ";
                        DataArray[runTime]=Data;
                    }   
                    cout << endl;
                }

                debugOut.open("IPCWriteTest.txt", std::ios::app);
                for (int runTime = 0; runTime < maxRunTime; runTime++)
                {
                    debugOut << "runTime: " << runTime << " Data: ";
                    for (int i = 0; i < 3; i++)
                    {
                        debugOut << DataArray[runTime] << " ";

                    }   
                    debugOut << endl;
                }
                debugOut.close();
                state = 0;
                break;
            default:
                break;
        }
    }

    std::cout << "done" << endl;
    getchar();
}

【问题讨论】:

  • 考虑为此使用线程而不是进程。
  • 会有多少消费/生产过程?
  • 只有一个生产者和一个消费者。这个想法只是让 process1 尽可能快地运行以计算数据,然后让另一个 process2 来操作它,在这种情况下写入文件。它们必须是两个不同的程序。
  • 你可以看看spsc_queue和共享内存:stackoverflow.com/questions/22207546/…
  • 你为什么要使用队列?为什么不将数据放在共享内存中并使用互斥锁或信号量控制访问?

标签: c++ windows performance boost ipc


【解决方案1】:

您正在为每个操作打开队列。

您应该尝试打开一次并传递对所有相关代码的引用(通常您会将其存储为类中的成员)。

另外,有单独的队列是缓慢的秘诀。在我看来,您正在“滥用”mqState 作为interprocess::condition_variable 或信号量:

无论如何,将异常转换为繁琐的错误代码并不是很有效率。您正在手动执行异常处理应该执行的操作。

此外,将调试消息跟踪到标准输出这一事实将大大减慢程序速度,尤其是在 Windows 上

观察者注意事项

同样的事情发生了,debugOutput 文件也可能不应该连续重新打开。

在三元组中“硬循环”很奇怪。如果是队列,则一次只弹出 1 条消息。如果一条消息“逻辑上”由三个浮点数组成,则发送包含三个浮点数的消息。现在我什至认为这是一个错误:

            for (int i = 0; i < 3; i++) {
                data = getData();
                std::cout << data << "   ";
                DataArray[runTime] = data;
            }

它将三个不同的值分配给同一个索引 (runTime)...

简化代码

在我“审查”(清理)后的生产者代码:

Live1 On Coliru

#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>

namespace bip = boost::interprocess;
namespace pt  = boost::posix_time;

struct QueueLogic {

    bool forced_remove = bip::message_queue::remove("msgQData");
    bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };

    bool SetData(float data) {
        return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }
};

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;

int main() {
    std::vector<float> pre_calculated;
    std::generate_n(back_inserter(pre_calculated), 10000*100, [] { return rand()%100; });

    auto start = Clock::now();

    try {
        QueueLogic instance;

        for (auto v : pre_calculated)
            instance.SetData(v);

    } catch(std::exception const& e) {
        std::cout << "Exception thrown: " << e.what() << "\n";
        bip::message_queue::remove("msgQData");
        throw;
    }

    auto end = Clock::now();
    std::cout << boost::chrono::duration_cast<boost::chrono::milliseconds>(end-start) << "\n";
}

消费者代码:

Live1 On Coliru

#include <iostream>
#include <fstream>
#include <vector>

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>

using namespace std;
namespace bip = boost::interprocess;
namespace pt  = boost::posix_time;

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;

struct ObserverLogic {

    bip::message_queue mqData{bip::open_only, "msgQData"};

    float getData() {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
                                  pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(10))) 
        {
            throw std::runtime_error("timeout in timed_receive");
        }

        return data;
    }
};

int main() {
    std::vector<float> DataArray;
    DataArray.reserve(100000);

    ObserverLogic instance;

    try {
        while (DataArray.size() <= 100000) {
            DataArray.push_back(instance.getData());
        }
    } catch (std::exception const &e) {
        std::cout << "Exception caught: " << e.what() << "\n";
    }

    std::cout << "Received " << DataArray.size() << " messages\n";
    std::copy(DataArray.begin(), DataArray.end(), std::ostream_iterator<float>(std::cout, "; "));

    std::cout << "\n\ndone" << std::endl;
}

备注

Live1 - Coliru 上不允许共享内存

【讨论】:

  • 在我查看代码时添加更多注释:livecoding.tv/sehe
  • 我在审核后添加了代码 (vid of the livestream)。代码执行得很好。不需要同步。如果您仍然需要它,只需使用同步原语而不是完整队列。
  • (如果您想再次调试输出到文件:coliru
  • 代码看起来很酷,我会尝试使用它并且之前有过,但是 MSVC11 编译器不支持所有 c++11 功能,所以我将安装更新版本来检查。但是,由于制作人的原因,我认为您的代码在我的设置中也可能会运行得很慢。我需要一个生产者,它会在每个时间步将数据写入“memory/msgQ”。计算数据->保存到内存->计算新数据->保存到内存等等。第二个问题是消费者永远不会知道保存了多少数据,但这是我可以解决的另一个问题:)
  • 生产者将有“两个”函数,一个将初始化内存块打开 msqQ 或需要什么..第二个函数将被调用 x 次,并在每个时间步将数据写入内存。假设函数二应该运行 100 次并将 100 个变量保存到内存中。如果代码由于某种原因只运行了 33 次,那么数据应该在内存中并由消费者访问。生产者写入内存是速度至关重要的地方。消费者的处理时间并不重要。希望一切都有意义,我会和你一起玩代码,觉得它看起来很整洁!
【解决方案2】:

请在下面找到我更新的代码:使用 MSVC14 编译。

我现在只有一个问题。如果我在生产者运行时关闭消费者,它会停止吗?不知道为什么。

制片人

#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>
#include <time.h>
#include <windows.h>

namespace bip = boost::interprocess;
namespace pt = boost::posix_time;

struct QueueLogic 
{
    //DataConfig Setup
    bool forced_removeDataConfig = bip::message_queue::remove("msgDataConfig");
    bip::message_queue mqDataConfig{ bip::open_or_create, "msgDataConfig", 2, sizeof(float) };

    bool SetDataConfig(float data) {
        return !mqDataConfig.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }

    //Data Setup
    bool forced_remove = bip::message_queue::remove("msgQData");
    bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };

    bool SetData(float data) {
        return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }
};



int main() 
{
    time_t start, end;
    time(&start);

    float noVarsToMonitor = 10.f;
    float minTimeStep = 1.f;// 0.001f;

    std::vector<float> pre_calculated;
    std::vector<float> data_config;

    //Set Vars to monitor
    data_config.push_back(noVarsToMonitor); //Add noVars as first param in vector
    data_config.push_back(minTimeStep); //Add noVars as first param in vector

    //Parse parameters into vector
    std::generate_n(back_inserter(pre_calculated), noVarsToMonitor, [] { return rand() % 100; });

    //Create instance of struct
    QueueLogic instance;

    //Setup data config
    try
    {       
        for (auto v : data_config)
        {
            instance.SetDataConfig(v);
        }
    }
    catch (std::exception const& e)
    {
            std::cout << "Exception thrown: " << e.what() << "\n";
            bip::message_queue::remove("msgQData");
            bip::message_queue::remove("msgDataConfig");
            throw;
    }

    //Get Data
    for (size_t i = 0; i < 1000; i++) //simulate that code will be called 1000 times after data is recalculated
    {
        try
        {

            for (auto v : pre_calculated)
            {
                instance.SetData(v);
            }
            std::cout << "case: " << i << std::endl;
            Sleep(20); //sleep to test code including consumer
        }
        catch (std::exception const& e)
        {
            std::cout << "Exception thrown: " << e.what() << "\n";
            bip::message_queue::remove("msgQData");
            bip::message_queue::remove("msgDataConfig");
            throw;
        }
    }

    time(&end);
    double dif = difftime(end, start);
    printf("Elasped time is %.2lf seconds.", dif);

    getchar();
}

消费者:

#include <iostream>
#include <fstream>
#include <vector>
#include <windows.h>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>

using namespace std;
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;

struct ObserverLogic 
{
    //Get Config Data
    bip::message_queue mqDataConfig{ bip::open_only, "msgDataConfig" };

    float getDataConfig()
    {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqDataConfig.timed_receive(&data, sizeof(data), recvd_size, priority,
            pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
        {
            throw std::runtime_error("timeout in timed_receive");
        }
        return data;
    }

    //Get Var Data
    bip::message_queue mqData{ bip::open_only, "msgQData" };

    float getData() 
    {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
            pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
        {
            throw std::runtime_error("timeout in timed_receive");
        }
        return data;
    }
};

int main() {
    std::vector<float> DataArray;
    int count = 0; 
    float maxMonitorTime = 10.f;
    DataArray.reserve(100000);

    //Fetch this from Producer
    float noVarsToMonitor = 0.f; 
    float minTimeStep = 0.f;
    float maxSimSamples = 0.f;

    while (true)
    {
        try
        {
            ObserverLogic instance;

            //Get Numbers of vars to monitor - used another thread!
            noVarsToMonitor = instance.getDataConfig();
            minTimeStep = instance.getDataConfig();
            maxSimSamples = (noVarsToMonitor*(maxMonitorTime * floor((1 / minTimeStep) + 0.5)))-1;

            std::cout << "noVarsToMonitor: " << noVarsToMonitor << std::endl;
            std::cout << "minTimeStep: " << minTimeStep << std::endl;
            std::cout << "maxSimSamples: " << maxSimSamples << std::endl;

            std::ofstream ofs("IPQ_Debug.log", std::ios::trunc); //Only clear when data is ready from Producer

            //Get Var Data below here:
            try
            {
                while (DataArray.size() <= maxSimSamples)
                {
                    float value = instance.getData();
                    DataArray.push_back(value);
                    ofs << value << "; ";
                    count++;

                    if (count>noVarsToMonitor - 1) //Split Vector to match no Vars pr. Timestep
                    {
                        ofs << std::endl;
                        count = 0;
                    }
                }
                std::cout << "Received " << DataArray.size() << " messages\n";
                std::cout << "\n\ndone" << std::endl;
                std::cout << std::endl;
            }
            catch (std::exception const &e)
            {
                std::cout << "Exception caught: " << e.what() << "\n";
            }
        }
        catch (std::exception const &e)
        {
            std::cout << "Exception caught: " << e.what() << "\n";
        }
        std::cout << "Wait 5 seconds to try fetch again" << "\n";
        Sleep(5000); //sleep and wait to run loop again before looking at for msqQ
    }

    getchar();
}

输出到 txt:

41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 

然后可以根据“模拟时间”绘制输出,将数据保持在正确的列和行中。

它可能仍然不是漂亮的代码,但我仍在学习,感谢我在第一篇文章中获得的支持。请随时发表评论

【讨论】:

  • (这不是答案)
  • 你想达到什么目的。您再次使用某种“带外”配置数据传递机制使代码复杂化。这不好。如果您真的只想发送noVarsToMonitor 的突发,为什么不发送 而不是松散的样本?这也将 - 可能 - 将速度提高 ~ noVarsToMonitor
  • 代码是复制我的真实代码。我有以 0.001 毫秒时间步运行的代码,对于每个时间步,它将计算 noVarsToMonitor,这些变量将发送给消费者。代码将再运行一个时间步计算 noVarsToMonitor 并再次解析给消费者。我的 forloop 运行了 1000 次,以说明我将解析 10 个重新计算的变量 1000 次。我不知道如何解释,但有意义吗?
猜你喜欢
  • 1970-01-01
  • 2013-01-05
  • 2012-09-19
  • 2021-03-12
  • 2011-11-04
  • 1970-01-01
  • 2020-11-27
  • 1970-01-01
  • 2021-12-11
相关资源
最近更新 更多