【问题标题】:Is there an existing way to serialize invocations of a boost::signals2 signal?是否有现有的方法来序列化 boost::signals2 信号的调用?
【发布时间】:2025-11-25 20:40:02
【问题描述】:

我想序列化 boost::signals2 信号的多线程调用,以确保来自对象的状态更改通知以明确定义的顺序到达插槽。

背景

我在多线程程序中有一个具有内部状态的对象。内部状态的某些部分对程序的其他部分很感兴趣,并且对象通过使用 boost::signals2 信号来暴露状态更改,类似于:

class ObjectWithState {
public:
    enum State {
        STATE_A,
        STATE_B,
        STATE_C,
    };

    void OnEvent() {
        State newState;
        {
            boost::lock_guard<boost::mutex> lock(m_Mutex);
            // Process event and change state
            m_State = ...;
            newState = m_State;
        }
        m_OnStateChanged(newState);
    }

    // method to allow external objects to connect to the signal etc
private:
    boost::signals2::signal<void (State) > m_OnStateChanged;
    boost::mutex m_Mutex;
    State m_State;
};

问题

如果有多个 OnEvent 处理程序的并发调用,这可能会导致侦听器以不同于实际发生更改的顺序收到有关状态更改的通知。状态本身由上面的互斥体保护,因此 实际 状态是明确定义的。但是,互斥锁不能在对信号的调用中保持,因为这可能导致死锁。这意味着信号的实际调用可能以任何顺序发生,而我会要求它们以与实际发生状态更改的顺序相同的顺序被调用。

处理此问题的一种方法是从信号中删除状态,并仅通知侦听器状态已更改。然后,他们可以查询对象的状态,并获取对象在触发信号时所具有的状态或以后的状态。在我的场景中,需要通知侦听器所有状态变化,所以这个方法在这里不起作用。

我的下一个方法如下:

class ObjectWithState {
public:
    enum State {
        STATE_A,
        STATE_B,
        STATE_C,
    };

    void OnEvent() {
        State newState;
        boost::unique_future<void> waitForPrevious;
        boost::shared_ptr<boost::promise<void> > releaseNext;
        {
            boost::lock_guard<boost::mutex> lock(m_Mutex);
            // Process event and change state
            m_State = ...;
            newState = m_State;
            waitForPrevious = m_CurrentInvocation->get_future();
            m_CurrentInvocation.reset(new boost::promise<void>());
            releaseNext = m_CurrentInvocation;
        }
        // Wait for all previous invocations of the signal to finish
        waitForPrevious.get();

        // Now it is our turn to invoke the signal
        // TODO: use try-catch / scoped object to release next if an exception is thrown
        OnStateChanged(newState);

        // Allow the next state change to use the signal
        releaseNext->set_value();
    }

    // method to allow external objects to connect to the signal etc
private:
    boost::signals2::signal<void (State) > m_OnStateChanged;
    boost::mutex m_Mutex;
    State m_State;
    // Initialized with a "fulfilled" promise in the constructor
    // or do special handling of initially empty promise above
    boost::shared_ptr<boost::promise<void> > m_CurrentInvocation;
};

我没有尝试过上面的代码,所以它可能充满了错误和编译错误,但应该可以推断出我所追求的。我的直觉告诉我,我不是第一个遇到此类问题的人,我更喜欢使用经过验证的代码而不是我自己的... :) 所以我的问题是:

是否有预先存在的方法来实现对 boost::signals2 信号的序列化调用(例如内置到 signals2 库或通用模式中)?

【问题讨论】:

  • “但是,互斥锁不能在信号调用期间保持,因为这可能导致死锁。”你能澄清一下吗?某些信号处理程序可以更改ObjectWithState 的状态吗?他们会产生新的OnEvent() 电话吗?如何确保不进入无限递归?
  • @user1202136:是的。递归调用将是导致死锁的一种方法。一种更狡猾的方法是,如果您有两个 ObjectWithState 并为状态更改设置一个处理程序,在某些情况下会触发另一个状态更改。这导致一个线程持有第一个互斥锁并尝试锁定另一个,另一个线程持有另一个互斥锁并尝试锁定第一个。因此,一般准则是在调用未知代码时永远不要持有锁,请参阅drdobbs.com/article/… 进行讨论。

标签: c++ multithreading future boost-signals2


【解决方案1】:

我提出以下解决方案。创建一个待处理信号队列并让一个单独的线程调度它们。代码大致如下:

class ObjectWithState {
private:
    bool running;
    std::queue<State> pendingSignals;
    boost::condition_variable cond;
    boost::mutex mut;

    void dispatcherThread()
    {
        while (running)
        {
            /* local copy, so we don't need to hold a lock */
            std::vector<State> pendingSignalsCopy;

            /* wait for new signals, then copy them locally */
            {
                boost::unique_lock<boost::mutex> lock(mut);
                cond.wait(mut);
                pendingSignalsCopy = pendingSignals;
                pendingSignals.clear();
            }

            /* dispatch */
            while (!pendingSignalsCopy.empty())
            {
                State newState = pendingSignalsCopy.front();
                OnStateChanged(newState);
                pendingSignalsCopy.pop();
            }
        }
    }

public:
    void OnEvent()
    {
        State newState;
        ...

        /* add signal to queue of pending signals and wake up dispatcher thread */
        {
            boost::unique_lock<boost::mutex> lock(mut);
            pendingSignals.push(state);
            cond.notify_all();
        }
    }
};

【讨论】:

  • 这将部分实现我想要的相同的东西,但是我对为每个对象专用一个单独的线程并不那么兴奋。所以接下来的事情就是为所有对象共享一个线程——但是为此专门使用一个线程仍然有点尴尬,所以你希望在你的线程池中分配工作。此外,此实现不保证在 OnStateChanged 函数退出时已通知所有侦听器,这在某些情况下可能是一个问题(但在其他情况下是一个功能)。
  • "另外,这个实现并不能保证在 OnStateChanged 函数退出时所有的监听器都得到通知" 为什么不呢? IIRC,boost::signals2::signal::operator() 遍历所有插槽,然后才返回。此外,您的提议基本上阻止了 waitForPrevious.get() 中的所有线程(除了一个)。因此,实际上最好为通知专门设置一个线程。
  • 将 OnStateChanged 替换为 OnEvent,很抱歉造成混淆。阻塞的区别正是我要指出的,在某些情况下阻塞线程是必要的,以确保通知已经发生,在其他情况下,它可以让线程继续前进。
最近更新 更多