【问题标题】:C++ Main thread notifying threads notifying main threadC++ 主线程通知线程 通知主线程
【发布时间】:2015-10-08 16:42:46
【问题描述】:

我需要帮助。我正在尝试做一些具体的事情,但我缺乏多线程技能正在杀死我。

基本上,我的主程序/线程需要管理许多必须运行多次的“通道”。由于这些运行是独立的,因此每个通道都包含一个执行它们的线程。

所以主线程必须等待所有通道(线程)完成它们的运行才能启动下一个。 并且所有通道都必须等待来自主线程的通知才能运行。

以下是我的做法 - 抱歉有点长!

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <atomic>

std::mutex              g_lockprint;
std::mutex              g_lockbatch;
std::condition_variable g_nextbatch;
std::mutex              g_lockready;
std::condition_variable g_ready;

int global_id = 0;
int nbChannels = 5;
std::atomic<int> nbChannelsLeftToEnd;

class Channel {

private:

    int _id;
    std::thread _th;
    std::atomic<bool> next_batch;
    std::atomic<bool> stop_th;

public:

    Channel() : _id(global_id++), _th(), next_batch(false), stop_th(false) {}

    void go_for_next_batch() { next_batch = true; }

    void start(int& start, int &end){
        _th = std::thread(&Channel::run, this, std::ref(start), std::ref(end));
    }

    void stop(){
        stop_th = true;
        _th.join();
    }

    void run(int& start, int& end){
        while (!stop_th){
            {
                std::unique_lock<std::mutex> locker(g_lockbatch);
                g_nextbatch.wait(locker, [&](){return (next_batch==true); });
            }

            // print a starting message
            {
                std::unique_lock<std::mutex> locker(g_lockprint);
                std::cout << "[channel " << _id << "]\trunning in [" << start << "," << end << "]" << std::endl;
            }

            // simulate work
            std::this_thread::sleep_for(std::chrono::seconds(1));

            // update the number of channels left to run
            nbChannelsLeftToEnd--;
            g_ready.notify_one();
            next_batch = false;
        }
    }
};

int main()
{
    int end = 100;
    int batch = 10;
    int startBatch = 0;
    int endBatch = startBatch + batch;

    // declare some channels (threads)
    std::vector<Channel> channels(nbChannels);

    // start the threads
    for (auto& ch : channels) ch.start(startBatch, endBatch);

    while (endBatch<=end){
        {
            std::unique_lock<std::mutex> locker(g_lockprint);
            std::cout << "[main]\trunning in [" << startBatch << "," << endBatch << "]" << std::endl;
        }
        nbChannelsLeftToEnd = nbChannels;
        for (auto& ch : channels) ch.go_for_next_batch();
        g_nextbatch.notify_all();

        std::unique_lock<std::mutex> locker(g_lockready);
        g_ready.wait(locker, [&](){return (nbChannelsLeftToEnd == 0); });

        startBatch += batch;
        endBatch += batch;
    }

    for (auto& ch : channels) ch.stop();

    return 0;
}

但有时程序会阻塞,可能是线程相互等待,但我不明白为什么。 在任何情况下,加入线程(main 末尾的“stop”方法)会使我的程序无限期地运行,也不明白为什么。

编辑:感谢您的 cmets 和一些研究,我设法使用同步屏障获得了一个工作程序,因此主线程可以等待所有其他线程完成当前批处理,然后再告诉它们开始下一个。 我重用了此处引用 Anthony Wiiliams's book 的某人的屏障代码 - 这是 屏障

class barrier
{
    unsigned const count;
    std::atomic<unsigned> spaces;
    std::atomic<unsigned> generation;

public:
    explicit barrier(unsigned count_) :
        count(count_), spaces(count), generation(0) {}

    void wait()
    {
        unsigned const my_generation = generation;
        if (!--spaces)
        {
            spaces = count;
            ++generation;
        }
        else
        {
            while (generation == my_generation)
                std::this_thread::yield();
        }
    }
};

这是使用屏障的 Channel 类的新 run 方法 - 请注意对“stop_th”标志的附加测试。当线程在最后一批之后并且在加入之前被解除阻塞时,它不应该运行另一批,因此该测试。

void run(int& start, int& end, barrier& b)
{
    while (!stop_th){
        // wait for next batch notification - use the next_batch flag to avoid
        // spurious wake-ups
        {
            std::unique_lock<std::mutex> locker(g_lockbatch);
            g_nextbatch.wait(locker, [&](){return (next_batch==true); });
        }

        if (stop_th) return;

        // simulate work
        std::this_thread::sleep_for(std::chrono::seconds(1));

        // wait for everyone to meet
        next_batch = false;
        b.wait();
    }
}

最后是ma​​in

int main()
{
    int end = 100;
    int batch = 10;
    int startBatch = 0;
    int endBatch = startBatch + batch;

    // declare a barrier where all threads will meet
    barrier b(nbChannels+1);

    // declare some channels (threads)
    std::vector<Channel> channels(nbChannels);

    // start the threads
    for (auto& ch : channels) ch.start(startBatch, endBatch, b);

    while (endBatch<=end){

        // notify the channels they can process one batch
        for (auto& ch : channels) ch.go_for_next_batch();
        g_nextbatch.notify_all();

        // wait until all threads have finished their batch
        b.wait();

        // prepare the next one
        startBatch += batch;
        endBatch += batch;
    }

    // all channels are blocked by the next_batch condition
    // so notify a next batch and join them
    for (auto& ch : channels) ch.stop();
    for (auto& ch : channels) ch.go_for_next_batch();
    g_nextbatch.notify_all();
    for (auto& ch : channels) ch.wait_until_stopped();

    return 0;
}

再次感谢您的所有 cmets / 回答!!!

【问题讨论】:

  • 您应该能够将调试器连接到实时进程(例如 Linux 上的 gdb &lt;exe name&gt; &lt;pid&gt;)并列出线程的当前状态。我发现这通常可以很好地表明导致问题的原因。我怀疑您应该尝试坚持使用一个互斥锁或仅按严格顺序锁定/解锁。

标签: c++ multithreading concurrency conditional-statements


【解决方案1】:

由于我修复了 cpp.sh 中的代码,因此我将评论更改为答案,现在似乎已完成。

关于它们在调用停止时不存在。 请注意,它们可能仍处于等待下一个批处理锁的状态。 考虑添加一个调用以将它们从锁定中释放,并让它们检查它们是否在锁定步骤后停止。

将停止函数分为两个函数,一个是更改布尔值,另一个是等待。 让我们调用两个函数 stop 和 wait_until_stopped

然后在main函数中加入如下代码。

代替

for (auto& ch : channels) ch.stop();

用途:

for (auto& ch : channels) ch.stop();

for (auto& ch : channels) ch.go_for_next_batch();

g_nextbatch.notify_all();

for (auto& ch : channels) ch.wait_until_stopped();

【讨论】:

  • +1!确实线程正在等待下一批,您的解决方案完美运行!好吧,至少当我到达那个点时,程序经常在此之前冻结,所以可能还有一些问题......
  • 如果您在程序中冻结,很可能是死锁。锁之间的竞赛。附加一个调试器,看看谁在等待谁喜欢@Component 10 的建议。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-04-05
相关资源
最近更新 更多