【问题标题】:boost::interprocess message queue Race Condition on creation?boost::interprocess 消息队列创建时的竞争条件?
【发布时间】:2014-03-04 21:32:48
【问题描述】:

我正在尝试调试在 boost::interprocess 消息队列中发生的零星访问冲突。 (访问冲突读取共享内存区域中的地址)。

环境:boost 1.54,VC++2010。发生在调试和发布版本中。

它总是发生在 message_queue.hpp 中的第 854 行或附近(在接收的情况下): 评论是我添加的

      recvd_size     = top_msg.len; // top_msg points to invalid location

或第 756 行(在发送的情况下)

BOOST_ASSERT(free_msg_hdr.priority == 0); // free_msg_hdr points to invalid location

这似乎与消息队列的创建有关。如果“正确”创建消息队列(即没有可能的竞争条件),则永远不会发生错误。 否则它可能会在看似随机的时间发生在队列上的 timed_receive() 或 timed_send() 上。

我想出了一个代表问题的简短示例: 不幸的是,我无法在 Coliru 上运行它,因为它需要两个进程。 一个必须在没有任何参数的情况下启动,第二个必须使用任何单个参数。 多次运行后,其中一个进程将在 message_queue 中崩溃。

#include <iostream>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/thread.hpp>
#include <boost/assert.hpp>
#include <boost/date_time.hpp>

using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock; // microsec_clock is ambiguous between boost::posix_time and boost::interprocess. What are the odds?

int main(int argc, wchar_t** argv)
{
    while(true)
    {
        int proc = 0;
        message_queue* queues[2] = {NULL, NULL};
        std::string names[] = {"msgq0", "msgq1"};
        if(1 == argc)
        {
            proc = 0;
            message_queue::remove(names[0].c_str());
            if(NULL != queues[0]) { delete queues[0]; queues[0] = NULL; }
            queues[0] = new message_queue(open_or_create, names[0].c_str(), 128, 10240);

            bool bRet = false;
            do
            {
                try
                {
                    if(NULL != queues[1]) { delete queues[1]; queues[1] = NULL; }
                    queues[1]=new message_queue(open_only, names[1].c_str());
                    bRet = true;
                }
                catch(const interprocess_exception&)
                {
                    //boost::this_thread::sleep(boost::posix_time::milliseconds(2));
                    delete queues[1];
                    queues[1] = NULL; 
                    continue;
                }
            }while(!bRet);

        }
        else
        {
            proc = 1;
            message_queue::remove(names[1].c_str());
            if(NULL != queues[1]) { delete queues[1]; queues[1] = NULL; }
            queues[1] = new message_queue(open_or_create, names[1].c_str(), 128, 10240);

            bool bRet = false;
            do
            {
                try
                {
                    if(NULL != queues[0]) { delete queues[0]; queues[0] = NULL; }
                    queues[0]=new message_queue(open_only, names[0].c_str());
                    bRet = true;
                }
                catch(const interprocess_exception&)
                {
                    //boost::this_thread::sleep(boost::posix_time::milliseconds(2));
                    delete queues[0];
                    queues[0] = NULL;
                    continue;
                }
            }while(!bRet);
        }

        long long nCnt = 0;
        for(int i = 0; i < 1; ++i)
        {
            if(proc)
            {
                std::string sOut;
                sOut = "Proc1 says: Hello ProcA " + std::to_string(nCnt) + " ";
                sOut.resize(10230, ':');
                for(int n = 0; n < 3; ++n)
                {
                    queues[1]->timed_send(sOut.data(), sOut.size(), 0, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
                }

                bool bMessage = false;
                for(int n = 0; n < 3; ++n)
                {
                    size_t nRec; unsigned int nPrio;
                    std::string sIn; sIn.resize(10240);
                    bMessage = queues[0]->timed_receive(&sIn[0], 10240, nRec, nPrio, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
                    if(bMessage)
                    {
                        sIn.resize(nRec);
                        //std::cout << sIn << " ";
                    }
                }
                if(bMessage)
                {
                    //std::cout << std::endl;
                }
            }
            else
            {
                std::string sOut;
                sOut = "Proc0 says: Hello Procccccccdadae4325a " + std::to_string(nCnt);
                sOut.resize(10240, '.');
                for(int n = 0; n < 3; ++n)
                {
                    queues[0]->timed_send(sOut.data(), sOut.size(), 0, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
                }

                bool bMessage = false;
                for(int n = 0; n < 3; ++n)
                {
                    size_t nRec; unsigned int nPrio;
                    std::string sIn; sIn.resize(10240);
                    bMessage = queues[1]->timed_receive(&sIn[0], 10240, nRec, nPrio, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
                    if(bMessage)
                    {
                        sIn.resize(nRec);
                        //std::cout << sIn << " ";
                    }
                }
                if(bMessage)
                {
                    //std::cout << std::endl;
                }
            }

            nCnt++;
            boost::this_thread::sleep(boost::posix_time::milliseconds(10));
        }
    }
    return 0;
}

我仍然在想我可能做错了什么,因为我在其他任何地方都找不到关于这个问题的任何信息,而且 boost 库通常非常好。

在这个例子中我使用 message_queue 有什么问题吗?

【问题讨论】:

    标签: c++ boost message-queue race-condition interprocess


    【解决方案1】:

    我不认为 both 使用open_or_create 的进程是受支持的习惯用法。你知道this thread on the mailing list吗?我找不到更多讨论,所以在我看来,终生管理最终被认为没有必要添加。

    因此,您需要手动将创建与boost::interprocess 同步,或者可能通过让其中一个进程重试到open_only 队列直到另一个进程创建它。

    【讨论】:

    • 谢谢!这就是示例正在做的事情。正如您所建议的,每个进程都会创建传出队列(open_or_create)并使用 open_only 旋转另一个。但是,只是由于您的帖子和链接的预感,我尝试使用 create_only 而不是 open_or_create。我目前正在测试这个,我稍后会回来。
    • 好的,经过一些快速测试,create_only 以及 open_only 的 catch 块中的 sleep(1000) 似乎可以将问题缓解到不会在生产应用程序中发生的程度。但是,在测试应用程序中,我仍然可以重现它,尽管频率要低得多。因此,似乎在创建/打开需要解决的消息队列方面存在竞争。令人惊讶的是,这显然从未在任何地方出现过,尽管关于所用锁的讨论让我对它的使用感到有点反感。
    • 如果这种情况再次发生,我可能会四处寻找其他 IPC 队列库,但目前上述“解决方案”似乎产生了可接受的结果,至少在生产应用程序中是这样。我将把这篇文章链接到 boost bugtracker;也许有人在那里有一些意见。
    猜你喜欢
    • 2021-04-20
    • 2021-09-24
    • 2019-04-08
    • 2019-07-04
    • 2013-04-27
    • 2020-11-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多