【问题标题】:thread that executes function calls from a main thread c++11从主线程c ++ 11执行函数调用的线程
【发布时间】:2018-02-15 06:37:43
【问题描述】:

我想实现一个线程,它可以接受来自主线程的函数指针并串行执行它们。我的想法是使用一个结构来保存函数指针及其对象并不断将其推送到队列中。这可以封装在一个类中。然后任务线程可以从队列中弹出并处理它。我还需要同步它(所以它不会阻塞主线程?),所以我正在考虑使用信号量。尽管我对程序的结构有一个不错的想法,但我在编写代码时遇到了麻烦,尤其是 C++11 中的线程和信号量同步。如果有人可以提出一个大纲,我可以通过它来实现这一点,那就太好了。

编辑:重复的问题回答了有关创建线程池的问题。看起来正在创建多个线程来做一些工作。我只需要一个线程就可以对函数指针进行排队并按接收顺序处理它们。

【问题讨论】:

  • '我还需要同步它(所以它不会阻塞主线程?'它不会。函数调用/返回不会改变线程上下文。'接受来自主线程的函数指针并执行它们串行'-代码不属于线程。如果有某个函数,任何线程都可以调用它。但是,为了清楚起见,如果从多个线程调用带有 main() 的单元中的函数,则应将其注释为 suc以便任何其他用户可以正确处理与函数外数据的任何交互。
  • 感谢您的评论。我了解代码不属于线程。我所说的串行执行函数的意思是线程一次只能处理一个函数。返回后,线程可以处理下一个函数。因此需要将 fn 指针存储在队列中。如果队列中没有项目,则线程需要休眠/挂起而不是旋转。为此,我需要一些机制,比如我猜的信号量?
  • 我想提供一个原型,但我无法回复,因为您的问题已被标记为重复
  • 我已编辑以澄清我的问题。希望有人能解锁。
  • @seccpur :我已经重新打开了。我同意这不是所列问题的重复。

标签: c++ multithreading


【解决方案1】:

检查这段代码 sn-p,虽然我没有使用类就实现了。看看有没有帮助。这里可以避免条件变量,但我希望读取器线程仅在有来自写入器的信号时才进行轮询,这样读取器中的 CPU 周期就不会被浪费。

#include <iostream>
#include <functional>
#include <mutex>
#include <thread>
#include <queue>
#include <chrono>
#include <condition_variable>

using namespace std;

typedef function<void(void)> task_t;

queue<task_t> tasks;
mutex mu;
condition_variable cv;

bool stop = false;

void writer()
{
    while(!stop)
    {
        {
            unique_lock<mutex> lock(mu);
            task_t task = [](){ this_thread::sleep_for(chrono::milliseconds(100ms));   };
            tasks.push(task);
            cv.notify_one();
        }

        this_thread::sleep_for(chrono::milliseconds(500ms)); // writes every 500ms
    }
}

void reader()
{
    while(!stop)
    {
        unique_lock<mutex> lock(mu);
        cv.wait(lock,[]() { return !stop;});  
        while( !tasks.empty() )
        {

            auto task = tasks.front();            
            tasks.pop();
            lock.unlock();
            task();
            lock.lock();
        }

    }
}

int main()
{
    thread writer_thread([]() { writer();}  );
    thread reader_thread([]() { reader();}  );

    this_thread::sleep_for(chrono::seconds(3s)); // main other task

    stop = true;


    writer_thread.join();
    reader_thread.join();
}

【讨论】:

  • 这不会在执行任务时阻塞队列吗?您可能应该在task()lock.lock(); 之前调用lock.unlock(); 吗?
  • 或使用类似 reverse_lock {boost::reverse_lock&lt;decltype(lock)&gt; relocker; task(); }
  • 我在锁之前使用了一个范围(请参阅额外的花括号),以便它仅在推送操作期间锁定,而不是在长时间睡眠期间锁定
  • 是的,但是互斥体在执行任务时一直被消费者持有,您不能在任务执行时以这种方式添加新任务
  • 看这就是为什么线程很难。现在,如果在发送通知时消费者已经在处理任务,消费者可能不会收到通知。还是要while( !tasks.empty() ),完成任务后需要重新加锁
【解决方案2】:

您的问题有 2 个部分。存储作业列表并以线程安全的方式操作作业列表。

对于第一部分,请查看 std::functionstd::bindstd::ref

对于第二部分,这类似于生产者/消费者问题。您可以使用std::mutexstd::condition_variable实现信号量。

有一个提示/大纲。现在我的完整答案...

第 1 步)

将函数指针存储在 std::function 队列中。

std::queue<std::function<void()>>

队列中的每个元素都是一个不带参数并返回void的函数。

对于带参数的函数,使用std::bind 绑定参数。

void testfunc(int n);
...
int mynum = 5;
std::function<void()> f = std::bind(testfunction, mynum);

当 f 被调用时,即f()5 将作为参数 1 传递给testfuncstd::bind 立即按值复制 mynum

您可能还希望能够通过引用传递变量。这对于从函数获取结果以及传递共享同步设备(如信号量和条件)很有用。使用 std::ref,引用包装器。

void testfunc2(int& n);  // function takes n by ref
...
int a = 5;
std::function<void()> f = std::bind(testfunction, std::ref(a));

std::functionstd::bind 可以与任何可调用对象(函数、函子或 lambda)一起使用——这非常简洁!

第 2 步)

当队列非空时,工作线程出队。您的代码应该类似于生产者/消费者问题。

class AsyncWorker
{
    ...

public:
    // called by main thread
    AddJob(std::function<void()> f)
    {
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_queue.push(std::move(f));
            ++m_numJobs;
        }
        m_condition.notify_one();  // It's good style to call notify_one when not holding the lock. 
    }

private:
    worker_main()
    {
        while(!m_exitCondition)
            doJob();
    }

    void doJob()
    {
        std::function<void()> f;
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            while (m_numJobs == 0)
                m_condition.wait(lock);

            if (m_exitCondition)
                return;

            f = std::move(m_queue.front());
            m_queue.pop();
            --m_numJobs;
        }
        f();
    }

    ...

注 1: 同步代码...与m_mutexm_conditionm_numJobs...本质上是在 C++'11 中实现信号量所必须使用的.我在这里所做的比使用单独的信号量类更有效,因为只有 1 个锁被锁定。 (信号量有自己的锁,您仍然需要锁定共享队列)。

注意 2:您可以轻松添加额外的工作线程。

注意 3: 在我的示例中,m_exitCondition 是 std::atomic&lt;bool&gt;

实际上以多态方式设置AddJob函数进入C++'11可变参数模板和完美转发...

class AsyncWorker
{
    ...

public:
    // called by main thread
    template <typename FUNCTOR, typename... ARGS>
    AddJob(FUNCTOR&& functor, ARGS&&... args)
    {
        std::function<void()> f(std::bind(std::forward<FUNCTOR>(functor), std::forward<ARGS&&>(args)...));
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_queue.push(std::move(f));
            ++m_numJobs;
        }
        m_condition.notify_one();  // It's good style to call notify_one when not holding the lock. 
    }

我认为如果您只使用按值传递而不是使用转发引用,它可能会起作用,但我没有对此进行测试,虽然我知道完美的转发效果很好。避免完美转发可能会使概念稍微不那么混乱,但代码不会有太大不同......

【讨论】:

  • 感谢您的详细解答。这看起来是经过深思熟虑的!
猜你喜欢
  • 1970-01-01
  • 2017-09-26
  • 2022-01-05
  • 1970-01-01
  • 2011-10-24
  • 1970-01-01
  • 2017-06-19
  • 2018-01-02
相关资源
最近更新 更多