【问题标题】:C/C++ pthread signals and pointersC/C++ pthread 信号和指针
【发布时间】:2013-07-19 21:04:24
【问题描述】:

我正忙于思考如何让线程相互发送信号。

我的设计:

主函数创建一个主线程来协调一堆其他工作线程。 main 函数还创建了工作线程,因为工作线程以 main 中编程的时间间隔产生和退出。主线程需要能够向这些工作线程发出信号并对它们进行signal_broadcast,而工作线程必须向主线程发回信号(pthread_cond_signal)。由于每个线程都需要一个 pthread_mutex 和 pthread_cond 我用这些变量创建了一个 Worker 类和一个 Master 类。现在这就是我卡住的地方。 C++ 不允许您将成员函数作为 pthread_create(...) 处理程序传递,因此我必须在内部创建一个静态处理程序并将一个指向自身的指针传递给 reinterpret_cast 它以使用其类数据...

void Worker::start() {
pthread_create(&thread, NULL, &Worker::run, this);
}

void* Worker::run(void *ptr) {
Worker* data = reinterpret_cast<Worker*>(ptr);
}

我遇到的问题(可能是错误的)设置是,当我将一组工作指针传递给主线程时,它会发出一个不同的工作引用信号,因为我认为演员做了某种复制。所以我尝试了 static_cast 和相同的行为。

我只需要某种设计,Master 和 worker 可以相互 pthread_cond_wait(...) 和 pthread_cond_signal(...)。

编辑 1

添加:

private:
    Worker(const Worker&);

还是不行。

【问题讨论】:

  • 你的初创公司看起来不错 - reinterpret_caststatic_cast 在这种情况下应该做同样的事情,因为你是从 void* 铸造的。我认为可能的问题是你有pthread_mutex_t 成员变量,你在初始化后复制/移动,这是严格禁止的。您是否在 Master/Worker 课程中禁用了复制构造?
  • 我尝试了但是:private Worker(const Worker &);工人操作员=(const Worker &);给我错误:'Worker Worker::operator=(const Worker&)' 是私有的
  • 如果您将指针数组传递给主线程,则不应调用复制构造函数。
  • @user622469 这不正是您所期望的吗?将其设为私有正是为了发生此错误。该错误显示(在编译时)正在该位置调用 Worker 的赋值运算符。你需要解决这个问题
  • 通常按值传递/返回或直接存储在容器中是陷阱。

标签: c++ multithreading pthreads


【解决方案1】:

编辑修复了所有版本中的潜在比赛:

1./1b 使用由C++0x has no semaphores? How to synchronize threads?中所述的(互斥+条件+计数器)构建的信号量
2. 使用“反向”等待来确保信号被预期的工作人员确认

我真的建议使用 c++11 样式 &lt;thread&gt;&lt;condition_variable&gt; 来实现这一点。

我有两个(半个)演示。他们每个人都假设您有 1 个 master 驱动 10 个 worker。每个工作人员在工作之前都会等待一个信号。

我们将使用std::condition_variable(与std::mutex 结合使用)来发出信号。第一版和第二版的区别在于信号的完成方式:

  • 1。一次通知任何员工:
  • 1b。使用工人结构
  • 2。通知所有线程,协调哪个接收工作人员响应

1。一次通知任何工作人员:

这是最简单的做法,因为几乎没有协调:

#include <vector>
#include <thread>
#include <mutex>
#include <algorithm>
#include <iostream>
#include <condition_variable>

using namespace std;

class semaphore 
{ // see https://stackoverflow.com/questions/4792449/c0x-has-no-semaphores-how-to-synchronize-threads
    std::mutex mx;
    std::condition_variable cv;
    unsigned long count;
public:
    semaphore() : count() {} 
    void notify();
    void wait();
};

static void run(int id, struct master& m);

struct master
{
    mutable semaphore sem;

    master()
    {
        for (int i = 0; i<10; ++i)
            threads.emplace_back(run, i, ref(*this));
    }

    ~master() {
        for(auto& th : threads) if (th.joinable()) th.join(); 
        std::cout << "done\n";
    }

    void drive()
    {
        // do wakeups
        for (unsigned i = 0; i<threads.size(); ++i)
        {
            this_thread::sleep_for(chrono::milliseconds(rand()%100));
            sem.notify();
        }
    }

  private:
    vector<thread> threads;
};

static void run(int id, master& m)
{
    m.sem.wait();
    {
        static mutex io_mx;
        lock_guard<mutex> lk(io_mx);
        cout << "signaled: " << id << "\n";
    }
}

int main()
{
    master instance;
    instance.drive();
}

/// semaphore members
void semaphore::notify()
{
    lock_guard<mutex> lk(mx);
    ++count;
    cv.notify_one();
}

void semaphore::wait()
{
    unique_lock<mutex> lk(mx);
    while(!count)
        cv.wait(lk);
    --count;
}

1b。使用工人结构

注意,如果你有 worker 类和 worker::run 一个非静态成员函数,你可以做同样的小修改:

struct worker
{
    worker(int id) : id(id) {}

    void run(master& m) const;

    int id;
};

// ...
struct master
{
    // ...

    master()
    {
        for (int i = 0; i<10; ++i)
            workers.emplace_back(i);

        for (auto& w: workers)
            threads.emplace_back(&worker::run, ref(w), ref(*this));
    }

// ...

void worker::run(master& m) const
{
    m.sem.wait();
    {
        static mutex io_mx;
        lock_guard<mutex> lk(io_mx);
        cout << "signaled: " << id << "\n";
    }
}

警告

  • cv.wait() 可能会遭受虚假唤醒,其中条件变量实际上没有被提高(例如,在操作系统信号处理程序的情况下)。这是任何平台上的条件变量都会发生的常见事情。

以下方法解决了这个问题:

2。通知所有线程,协调哪个接收者工作人员

使用标志来指示哪个线程打算接收信号:

struct master
{
    mutable mutex mx;
    mutable condition_variable cv;
    int signaled_id;               // ADDED

    master() : signaled_id(-1)
    {

让我们假设driver 变得更有趣,并希望以特定(随机...)顺序向所有工作人员发出信号:

    void drive()
    {
        // generate random wakeup order
        vector<int> wakeups(10);
        iota(begin(wakeups), end(wakeups), 0);
        random_shuffle(begin(wakeups), end(wakeups));

        // do wakeups
        for (int id : wakeups)
        {
            this_thread::sleep_for(chrono::milliseconds(rand()%1000));
            signal(id);
        }
    }

  private:
    void signal(int id)                // ADDED id
    {
        unique_lock<mutex> lk(mx);

        std::cout << "signaling " << id << "\n";

        signaled_id = id;              // ADDED put it in the shared field
        cv.notify_all();

        cv.wait(lk, [&] { return signaled_id == -1; });
    }

现在我们要做的就是确保接收线程检查它的 id 匹配:

m.cv.wait(lk, [&] { return m.signaled_id == id; });
m.signaled_id = -1;
m.cv.notify_all();

这结束了虚假唤醒。

完整的代码清单/现场演示:

【讨论】:

  • 我认为带有signaled_idsignal() 方案存在竞争条件,即主线程可以在最后一个发出信号的线程有机会读取signal_id 之前发出第二个线程的信号。这种情况在测试场景中可能看不到,但如果您不采取具体措施来防止它,几乎可以肯定会在现实世界的运行中看到。类似地,如果一个线程在调用condition_variable::wait()(同时持有锁)之前没有检查signaled_id,它会错过它的信号。
  • @MichaelBurr 由于wait 在等待期间锁定/解锁互斥锁的方式,这不会发生。请参阅 1) 下的 std::condition_variable::wait(第二个可能是一个问题,具体取决于应用程序。您可以通过在线程创建之前/期间锁定互斥锁来轻松修复它。)
  • 既然您调用notify_all(),我认为可能会释放多个等待线程。当目标线程等待获取互斥锁时,主线程可能会出现,获取互斥锁并更改signaled_id。对于在互斥体上等待的线程,无法保证 FIFO 行为。严格来说,即使只有一个线程在等待,主线程也可以在未被阻塞的等待线程有机会获得互斥锁并从wait()调用返回之前重新获得互斥锁。
  • 嗯。我不再那么确定了。确实没有明确的保证。我可能已经重新发现为什么每个理智的人都使用队列和/或信号量来完成这样的任务。 公平地说,我显然将与 OP 相关的要求过于复杂,所以我可能只是简化,但现在让我修复它
  • @MichaelBurr 我现在已经修复了所有三个变体。前两个带有适当的信号量。第二个暂时通过反向等待,直到工作人员ACK-s 发出信号。 当然,对此的适当队列解决方案会更灵活,但我已经 waaaay 超出了问题的范围。我还有什么遗漏吗?跨度>
【解决方案2】:

目前尚不清楚您的具体情况是什么,但您似乎正在使用一个容器来保存在main 中创建的“工人”实例,并将它们传递给您的“主人”。如果是这种情况,您可以使用一些补救措施。您需要选择一个适合您的实施。

  • main中容器的引用传递给Master。
  • 更改容器以保存指向工人的(智能)指针。
  • 使容器成为“Master”本身的一部分,这样就不需要传递给它。
  • 为您的 Worker 类实现适当的析构函数、复制构造函数和赋值运算符(换句话说,遵守 Rule of Three)。

从技术上讲,由于pthread_create() 是一个C API,因此传递给它的函数指针需要具有C 链接(extern "C")。你不能让 C++ 类的方法有 C 链接,所以你应该定义一个外部函数:

extern "C" { static void * worker_run (void *arg); }

class Worker { //...
};

static void * worker_run (void *arg) {
    return Worker::run(arg);
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多