【问题标题】:concurrent message processing ordered chronologically并发消息处理按时间顺序排列
【发布时间】:2015-02-17 03:44:53
【问题描述】:

我想在性能方面优化用 C++ 编写的消息解码器。解码器是完全按顺序设计的。实际并行化的概念很简单:

一旦新数据到达某个套接字,就告诉线程池运行另一个线程来解码接收到的消息。

在每个线程结束时,将调用一个方法(即发出一个Qt信号),并传递一个在处理过程中创建的对象。

我的问题是:处理的消息的长度和复杂性各不相同,因此线程完成的顺序可能与接收消息的顺序不同。换句话说,我需要在不使用线程安全容器的情况下就地序列化。

如何确保线程在完成后立即按正确的时间顺序调用上述方法,而不在线程安全容器中排队?

我的第一个想法是创建与线程池中的线程一样多的互斥锁,然后使用每个互斥锁将“已完成”信号从旧线程发送到新线程。

感谢任何cmets!

【问题讨论】:

  • 你能解释一下,为什么不能使用线程安全容器吗?

标签: c++ qt concurrency message-queue


【解决方案1】:

如果您真的不想使用诸如priority_queue 之类的数据结构或一系列预先保留的缓冲区并阻塞您的线程,您可以执行以下操作:

  1. 将您的消息与指示其原始索引的索引配对 定位并将其传递给线程池。
  2. 使用通用(例如全局、原子)计数器变量来指示最后处理的消息。
  3. 让每个线程等待,直到该变量指示上一条消息已被处理。
  4. 传递生成的对象并增加计数器

代码看起来像这样:

struct MsgIndexed {
    size_t idx;
    Msg msg;
};

//Single thread that receives all messages sequentially
void threadReceive() {
    for (size_t i = 1; true ; i++)
    {
        Msg m = readMsg();
        dipatchMsg(MsgIndexed{i,m});
    }
}

std::atomic<size_t> cnt=0;
//multiple worker threads that work in parallel
void threadWork() {
    while (1) {
        MsgIndexed msg = waitforMsg();
        Obj obj = processMsg(msg.msg);

        //Just for demonstration purposes. 
        //You probably don't want to use a spinlock here, but e.g. a condition variable instead     
        while (cnt != (msg.idx - 1u)) { std::this_thread::yield(); } 

        forwardObj(obj);
        cnt++;
    }
}

请注意,这是一个非常低效的解决方案,因为您的工作线程在完成实际工作后仍然需要等待。

【讨论】:

  • 感谢 MikeMB,这是一个了不起的解决方案。我唯一需要解决的是原子变量,因为我不能使用 C++11。我必须使用 Qt(QThread、QtConcurrent、QThreadPool 等)。也许我会使用 QWaitCondition。我是 Qt 中的并发新手,并且信号/槽机制在这种情况下表现得有些奇怪,我的印象是背景中有很多魔法。
  • 我没查过,但据我所知,c++11和QT并不是互斥的,所以应该可以使用atomics(不知道你是否需要尽管)。但是,我绝对同意您应该使用比自旋锁更高级的同步机制。顺便说一句:不要忘记验证您的新解决方案实际上比原来的解决方案更快。
  • 您对此有何看法:pastebin.com/7PXLLvTx 我使用了互斥锁和等待条件。您是否看到任何竞争条件或死锁?
  • 我认为您的代码应该按预期工作。我只是确保 processData() 非常简单并且不会锁定其他互斥锁。根据您在 DecoderTask 和接收处理对象的任何任务之间实现通信的方式,将 unlock() 直接放在 while 循环之后并用原子替换 seqProcessed 可能会更有效,甚至更不容易出错 - 假设 wakeAll( ) 不必受互斥体保护(c++11 的条件变量不需要)。但这是我的猜测,需要验证。
猜你喜欢
  • 2016-01-15
  • 2014-05-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-12-26
  • 2019-03-02
相关资源
最近更新 更多