【问题标题】:Maximum Size that a Boost Message Queue can Handle?Boost 消息队列可以处理的最大大小?
【发布时间】:2014-09-04 03:26:18
【问题描述】:

Boost 消息队列可以处理的最大大小是多少?

当我使用 Boost 二进制序列化发送和接收以下结构时,它工作正常。

typedef struct  {
int  msg_type;
char msg_name[100];

union {
    struct {
        int     ID;
        std::string    ReportedTime;
        char    ReceivedAt[200];
        int     Number;
        int     Priority;



    } mess1;

    struct  {
        char    host_ip[20];
        char    mac_addr[30];
        char    time_stamp[100];
    } mess2;
} struct_type;
}msg_struct;

#include <boost/serialization/is_bitwise_serializable.hpp> BOOST_IS_BITWISE_SERIALIZABLE(msg_struct)

但是当我将ReceivedAt[2000] 的大小设置为 2000 或添加一个新的 char 数组变量时。

它抛出以下异常并且核心被转储。

terminate called after throwing an instance of 'boost::interprocess::interprocess_exception' what(): boost::interprocess_exception::library_error

Aborted (core dumped)

编辑

发送

int  ControlQueue::pushMessage(msg_struct* msg)
{
int q_size = vamsgq->get_num_msg();
int retVal = 0;


    lockQueue();

//imp->ID=1;
//strcpy(imp->ReceivedAt,"10-07-14");


    std::stringstream oss;

    boost::archive::binary_oarchive oa(oss);
    strncpy(msg->msg_name,"msg_name",sizeof(msg->msg_name));

   oa << boost::serialization::make_array(msg, 1);


    std::string serialized_string(oss.str());

    vamsgq->send(serialized_string.data(), serialized_string.size(), 0);

    std::cout <<"\n sendig type="<< msg->msg_name << std::endl;


    if((retVal = pthread_cond_signal(&m_qCondSignal)) != 0)
    {
       cout<<__FILE__<<__LINE__<<
        "{ControlQueue %x} Unable to send Cond Signal",this);
    }

    unlockQueue();


return 1;
}

收到:

msg_struct* ControlQueue::getMsg()
{
int retVal = 0;
message_queue::size_type recvd_size;
unsigned int priority;
lockQueue();

while(vamsgq->get_num_msg()==0)
{
    if((retVal = pthread_cond_wait(&m_qCondSignal, &m_qMutex)) != 0)
    {
        cout<<__FILE__<<__LINE__<<"getMsg {ControlQueue } Unable to Cond Signal";
        unlockQueue();
        return NULL;
    }
}
msg_struct *l_msg_struct = NULL;

if(vamsgq->get_num_msg())
{

l_msg_struct=new msg_struct();

std::stringstream iss;
std::string serialized_string;
serialized_string.resize(MAX_SIZE);

vamsgq->receive(&serialized_string[0], MAX_SIZE, recvd_size, priority);
iss << serialized_string;

boost::archive::binary_iarchive ia(iss);
ia >> boost::serialization::make_array(l_msg_struct, 1);


 std::cout <<"Recieving="<< l_msg_struct->msg_name << std::endl;

}
else
{
    cout<<__FILE__<<__LINE__<<"getMsg {ControlQueue } m_MsgQ empty..";
}

unlockQueue();

return l_msg_struct;

}

发送和接收都在不同的线程中运行,我得到的唯一问题是在增加结构的大小或添加 char 数组变量之后。

vamsgq-&gt;send() 方法抛出异常,创建方法(消息队列)工作正常。而且我还在增加消息队列要存储的消息的大小。

是否有任何关于 boost::message_queue 的完整在线文档。

【问题讨论】:

  • 你能发布一个最小的 SSCCE 吗?我们无法从心理上获取异常信息。您将不得不处理异常或在抛出站点跟踪程序流/断点以查看失败的位置。
  • @sehe 谢谢。我认为它正在发生,因为我正在使用两个并行工作的线程,一个用于发送,一个用于接收,IPC 用于进程间通信。但我正在使用适当的侦听器来激活线程 2 来读取。它适用于小数据结构。

标签: c++ boost message-queue


【解决方案1】:
  1. 你违反了

    的要求
    BOOST_IS_BITWISE_SERIALIZABLE(msg_struct)
    

    不再将 msg_struct 保留为 POD 类型:

    static_assert(boost::is_pod<msg_struct>::value, "msg_struct must be POD");
    

    将无法编译。 (事实上​​,使用std::string 显示的msg_struct,我认为默认构造应该无法编译,所以您示例中的new msg_struct() 行让我感到困惑)。

  2. 另外,您没有显示消息队列的构造,所以我不知道您是如何确定最大消息大小的。这可能尺寸不够。有关使用/检查大小限制的两种方法,请参见下文。

以下是可行的:

[方案A]所有POD数据,无序列化

#include <boost/serialization/serialization.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

typedef struct {
    int  msg_type;
    char msg_name[100];

    union {
        struct {
            int  ID;
            char ReportedTime[100];
            char ReceivedAt[2000];
            int  Number;
            int  Priority;
        } mess1;

        struct  {
            char host_ip[20];
            char mac_addr[30];
            char time_stamp[100];
        } mess2;
    } struct_type;
} msg_struct;

static_assert(boost::is_pod<msg_struct>::value, "msg_struct must be POD");
#include <boost/serialization/is_bitwise_serializable.hpp>
BOOST_IS_BITWISE_SERIALIZABLE(msg_struct)

namespace ipc = boost::interprocess;

int main() {
    ipc::message_queue queue(ipc::open_or_create, "myqueue", 100, sizeof(msg_struct));

    msg_struct outgoing;
    outgoing.msg_type = 1;
    strncpy(outgoing.msg_name, "outgoing.msg_name", sizeof(outgoing.msg_name));

    outgoing.struct_type.mess1.ID = 42;
    strncpy(outgoing.struct_type.mess1.ReportedTime, "outgoing.struct_type.mess1.ReportedTime", sizeof(outgoing.struct_type.mess1.ReportedTime));
    strncpy(outgoing.struct_type.mess1.ReceivedAt, "outgoing.struct_type.mess1.ReceivedAt", sizeof(outgoing.struct_type.mess1.ReceivedAt));
    outgoing.struct_type.mess1.Number = 123;
    outgoing.struct_type.mess1.Priority = 234;

    queue.send(&outgoing, sizeof(outgoing), 1);
}

如您所见,这没有使用 Boost 序列化,因为该结构无论如何都是 POD。

[解决方案 B] 高级 C++ 类型,使用二进制序列化

或者,您可以一直使用 Boost Serialization 在发送时检查消息大小是否不超过最大消息大小:

#include <boost/serialization/serialization.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/serialization/variant.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>

typedef struct {
    int  msg_type;
    std::string msg_name;

    struct mess1_t {
        int  ID;
        std::string ReportedTime;
        std::string ReceivedAt;
        int  Number;
        int  Priority;
    private:
        friend class boost::serialization::access;
        template <typename Ar>
            void serialize(Ar& ar, unsigned)
            {
                ar & ID;
                ar & ReportedTime;
                ar & ReceivedAt;
                ar & Number;
                ar & Priority;
            }
    };

    struct mess2_t {
        std::string host_ip;
        std::string mac_addr;
        std::string time_stamp;
    private:
        friend class boost::serialization::access;
        template <typename Ar>
            void serialize(Ar& ar, unsigned)
            {
                ar & host_ip;
                ar & mac_addr;
                ar & time_stamp;
            }
    };

    boost::variant<mess1_t, mess2_t> message_data;

private:
    friend class boost::serialization::access;
    template <typename Ar>
        void serialize(Ar& ar, unsigned)
        {
            ar & msg_type;
            ar & msg_name;
            ar & message_data;
        }
} msg_struct;

namespace ipc = boost::interprocess;

int main() {
    ipc::message_queue queue(ipc::open_or_create, "myqueue", 100, 4*1024);

    msg_struct outgoing { 1, "outgoing.msg_name", msg_struct::mess1_t {
        42,
        "outgoing.struct_type.mess1.ReportedTime",
        "outgoing.struct_type.mess1.ReceivedAt",
        123,
        234 } 
    };

    std::ostringstream oss;
    boost::archive::binary_oarchive oa(oss);

    oa << outgoing;

    assert(oss.str().size() <= queue.get_max_msg_size());
    queue.send(&outgoing, sizeof(outgoing), 1);
}

【讨论】:

  • 顺便说一下,std::stringinside 按位序列化的结构也是 一个真正的问题。你需要解决这个问题。
  • 在解释无聊的代码/数据时很难做到 100% 肯定!
  • @MartinJames 你看到不同的 cmets 了吗?顺便说一句,我对我的代码很满意。
  • 问题出在大小上。我没有删除以前创建的消息队列,当我调用 open_or_create 时,它​​正在打开以前创建的较小大小的队列。因此我收到此错误。
  • @ali786 这有点道理。在某种程度上,它是关于队列的版本控制。您可能应该对这种情况进行一些检测(并开始为您的消息格式使用版本指示器,因为这可能是更改实现细节时发生的下一件事)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-07-10
  • 1970-01-01
  • 2021-10-10
  • 2011-08-19
  • 1970-01-01
  • 2011-08-06
  • 2010-09-20
相关资源
最近更新 更多