【问题标题】:C++11 std::thread queue consumer [closed]C++ 11 std::thread 队列消费者 [关闭]
【发布时间】:2015-08-04 18:14:35
【问题描述】:

我正在研究实现一个 FIFO 线程队列,其中创建了线程 lambda,但一次只处理 X 个线程。

我假设线程将捕获一些局部变量,但没有别的。

我正在尝试做的一个最小示例是:

#include <stdio.h>
#include <mutex>
#include <thread>
#include <atomic>
#include <vector>
#include <algorithm>
#include <condition_variable>

int main()
{
    std::vector<std::thread> threads;
    std::condition_variable condition;
    std::mutex mutex;
    std::atomic<unsigned int> thread_count( 0 );

    for ( int i = 0; i < 20; i++ )
    {
        std::unique_lock<std::mutex> lock ( mutex );
        condition.wait( lock, [&]{ return thread_count < 8;} );

        std::thread t = std::thread([&,i]()
        {
            printf ("I am Thread #: %d with current thread count: %u\n", i, unsigned( thread_count ) );
            thread_count--;
        });

        threads.push_back( std::move( t ) );
        thread_count++;
    }

    for ( auto & t : threads ) t.join();

    return 0;
}

我想在那个 for 循环中创建线程对象/lamdas,然后让它们最多运行 8 个线程。

当前输出为:

I am Thread #: 0 with current thread count: 3 
I am Thread #: 3 with current thread count: 4 
I am Thread #: 2 with current thread count: 4 
I am Thread #: 5 with current thread count: 3 
I am Thread #: 1 with current thread count: 3 
I am Thread #: 4 with current thread count: 5 
I am Thread #: 6 with current thread count: 1 
I am Thread #: 7 with current thread count: 1 
I am Thread #: 8 with current thread count: 1 
I am Thread #: 9 with current thread count: 1 
I am Thread #: 10 with current thread count: 1 
I am Thread #: 11 with current thread count: 1 
I am Thread #: 12 with current thread count: 1 
I am Thread #: 13 with current thread count: 1 
I am Thread #: 14 with current thread count: 1 
I am Thread #: 15 with current thread count: 1 
I am Thread #: 16 with current thread count: 1 
I am Thread #: 17 with current thread count: 1 
I am Thread #: 18 with current thread count: 1 
I am Thread #: 19 with current thread count: 1

这显然永远不会达到 8 个线程的最大数量。

您可以亲自查看here

【问题讨论】:

  • 不清楚你在问什么,你的代码示例与这个问题有什么关系。这也读得太多了 gimmetehcodez
  • 恭喜你实现了对这段代码的渴望。继续一个实际的问题..
  • 您没有尝试自己实现它。你只是要求别人为你做这件事。您的答案可以在 C++ Concurrency in Action Practical Multithreading, Anthony Williams, 2012, §9.1 中找到。
  • @bky_drytt 你是什么意思我没有尝试自己实现它?上面的代码是我在收到 Puppy 和 Barry 的批评后写的。我不是要求任何人用勺子喂我,而只是帮助编写这样的循环/机制/模式。我的印象是这就是 SO 的用途。任何链接或其他提示就足够了。
  • 你是对的。我对指控感到抱歉。您应该查看我在之前评论中指向您的书的部分,因为它正是您需要的。

标签: c++ multithreading c++11 gcc threadpool


【解决方案1】:

似乎您正在创建的线程终止的速度比主循环创建新线程的速度要快。所以你不会看到 8 个并行线程在运行,而是更少。

请注意,程序输出将是不确定的,因为线程调度在某种程度上是不可预测的。如果调用得足够频繁,您的程序实际上可能不时运行 8 个并行线程...

如果您更改 threadfunc lambda 使其不会立即终止,您将看到线程堆积。您可以通过在 threadfunc lambda 中添加一些对 sleep 的调用来实现。

请注意,您的程序可能会挂起,因为主循环将等待条件变量。但是没有人在这个条件下发出信号,所以它可能会永远挂起。

因此,您可能希望将 threadfunc lambda 末尾的 thread_count--; 行更改为:

std::unique_lock<std::mutex> lock ( mutex );
thread_count--;
condition.notify_one();

这不仅会减少线程计数,还会向条件变量发出信号,以便主循环可以唤醒并在一个终止时继续创建额外的线程。

完整的工作示例,每个线程随机延迟休眠(假装一些实际工作):

#include <stdio.h>
#include <mutex>
#include <thread>
#include <atomic>
#include <vector>
#include <algorithm>
#include <condition_variable>
#include <chrono>
#include <random>

int main()
{
    std::vector<std::thread> threads;
    std::condition_variable condition;
    std::mutex mutex;
    std::atomic<unsigned int> thread_count(0);
    std::mt19937 generator;

    for (int i = 0; i < 20; i++) {
        std::unique_lock<std::mutex> lock (mutex);
        condition.wait(lock, [&]{ return thread_count < 8; });
        // random sleep delay
        auto delay = generator() % 10000;

        std::thread t = std::thread([&, i, delay]() {
            printf("I am Thread #: %d with current thread count: %u\n", i, unsigned(thread_count));
            // ok to access the generator here because 
            std::this_thread::sleep_for(std::chrono::milliseconds(delay));
            printf("I am Thread #: %d and I am done\n", i);

            // signal that this thread is done
            std::unique_lock<std::mutex> lock(mutex);
            thread_count--;
            condition.notify_one();
        });

        threads.push_back(std::move(t));
        thread_count++;
    }

    for (auto& t : threads) {
      t.join();
    }
    return 0;
}

【讨论】:

  • 非常感谢您的帮助!这是运行线程队列的正确/正确/正常方式,还是有一些我不知道的其他模式?
  • 上面的线程队列使用了互斥体,所以至少工作项/线程的创建是单线程的(而工作执行不是)。除此之外,创建线程并在完成工作后将它们丢弃也会增加开销。一旦有新工作可用,保留线程并重新使用它们可能会更便宜。总体而言,与其滚动自己的线程队列,不如至少寻找一些现有的和经过实战考验的线程池和无锁队列组件。我不会在这里推荐任何东西,因为哪些最好取决于您的要求。
猜你喜欢
  • 1970-01-01
  • 2014-10-02
  • 2014-10-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多