【问题标题】:How to limit the number of running instances in C++如何在 C++ 中限制运行实例的数量
【发布时间】:2014-03-20 21:44:50
【问题描述】:

我有一个分配大量内存的 c++ 类。它通过调用一个第三方库来做到这一点,该第三方库被设计为在无法分配内存时崩溃,有时我的应用程序会在并行线程中创建我的类的多个实例。线程太多我崩溃了。 对于解决方案,我最好的想法是确保永远不会有超过三个实例同时运行。 (这是一个好主意吗?) 我目前实现 that 的最佳想法是使用 boost 互斥锁。类似于以下伪代码的内容,

MyClass::MyClass(){
  my_thread_number = -1; //this is a class variable
  while (my_thread_number == -1)
    for (int i=0; i < MAX_PROCESSES; i++)
      if(try_lock a mutex named i){
        my_thread_number = i;
        break;
      }
  //Now I know that my thread has mutex number i and it is allowed to run
}

MyClass::~MyClass(){
    release mutex named my_thread_number
}

如您所见,我不太确定此处互斥锁的确切语法。所以总结一下,我的问题是

  1. 当我想通过限制线程数来解决内存错误时,我是否走在正确的轨道上?
  2. 如果是,我应该使用互斥锁还是其他方式?
  3. 如果是,我的算法是否合理?
  4. 是否有一个很好的例子说明如何将 try_lock 与 boost 互斥锁一起使用?

编辑:我意识到我在谈论线程,而不是进程。 编辑:我正在参与构建一个可以在 linux 和 Windows 上运行的应用程序......

【问题讨论】:

  • 你能不能只在某个地方保留一个随着每次创建实例而递增的静态变量,其中新实例的创建取决于所述变量低于您设置的限制?
  • 为什么你的问题一半谈论线程,一半谈论进程?
  • @jalf :对不起,我的意思是线程。我混淆了这些概念。
  • @MMJZ 一个静态变量听起来很诱人,但没有办法使构造安全免受崩溃,有没有......?

标签: c++ boost boost-mutex


【解决方案1】:

更新我的其他答案解决了线程之间的资源调度问题(在问题得到澄清之后)。

它显示了在(许多)工作人员之间协调工作的信号量方法,以及首先限制工作人员并将工作排队的thread_pool

在 linux(可能还有其他操作系统?)上,您可以使用锁定文件习语(但某些文件系统和旧内核不支持它)。

我建议使用进程间同步对象。

例如,使用名为 semaphore 的 Boost Interprocess:

#include <boost/interprocess/sync/named_semaphore.hpp>
#include <boost/thread.hpp>
#include <cassert>

int main()
{
    using namespace boost::interprocess;
    named_semaphore sem(open_or_create, "ffed38bd-f0fc-4f79-8838-5301c328268c", 0ul);

    if (sem.try_wait())
    {
        std::cout << "Oops, second instance\n";
    }
    else
    {
        sem.post();

        // feign hard work for 30s
        boost::this_thread::sleep_for(boost::chrono::seconds(30));

        if (sem.try_wait())
        {
            sem.remove("ffed38bd-f0fc-4f79-8838-5301c328268c");
        }
    }
}

如果您在后台启动一个副本,新副本将在大约 30 秒内“拒绝”启动(“糟糕,第二个实例”)。

我觉得在这里颠倒逻辑可能更容易。嗯。让我试试。

一段时间过去了

呵呵。这比我想象的要棘手。

问题是,您要确保当您的应用程序被中断或终止时锁不会保留。为了分享便携式处理信号的技术:

#include <boost/interprocess/sync/named_semaphore.hpp>
#include <boost/thread.hpp>
#include <cassert>
#include <boost/asio.hpp>

#define MAX_PROCESS_INSTANCES 3

boost::interprocess::named_semaphore sem(
        boost::interprocess::open_or_create, 
        "4de7ddfe-2bd5-428f-b74d-080970f980be",
        MAX_PROCESS_INSTANCES);

// to handle signals:
boost::asio::io_service service;
boost::asio::signal_set sig(service);

int main()
{

    if (sem.try_wait())
    {
        sig.add(SIGINT);
        sig.add(SIGTERM);
        sig.add(SIGABRT);
        sig.async_wait([](boost::system::error_code,int sig){ 
                std::cerr << "Exiting with signal " << sig << "...\n";
                sem.post();
            });
        boost::thread sig_listener([&] { service.run(); });

        boost::this_thread::sleep_for(boost::chrono::seconds(3));

        service.post([&] { sig.cancel(); });
        sig_listener.join();
    }
    else
    {
        std::cout << "More than " << MAX_PROCESS_INSTANCES << " instances not allowed\n";
    }
}

那里有很多可以解释的。如果您有兴趣,请告诉我。

注意 很明显,如果在您的应用程序中使用了kill -9(强制终止),那么所有的赌注都将被取消,您将不得不删除 Name Semaphore 对象或显式解锁它(post())。

这是我系统上的测试:

sehe@desktop:/tmp$ (for a in {1..6}; do ./test& done; time wait)
More than 3 instances not allowed
More than 3 instances not allowed
More than 3 instances not allowed
Exiting with signal 0...
Exiting with signal 0...
Exiting with signal 0...

real    0m3.005s
user    0m0.013s
sys 0m0.012s

【讨论】:

  • 我刚刚修改了我的第二个示例,使其在面对进程中断/终止时更加稳健,并演示了如何一次限制为 3 个实例。
  • 感谢@sehe 的长篇大论和写得很好的回复-但您似乎解决了流程,而我想到的案例实际上是关于线程的...再次抱歉:) 我的案例应该更容易对吧?
  • @EmilFredrik ahahahaha ... wat!? :) 是的。那更简单。如:这是一个已解决的问题。查看我的新答案:stackoverflow.com/a/22553946/85371
【解决方案2】:

这是实现您自己的“信号量”的一种简单方法(因为我认为标准库或 boost 没有)。这选择了一种“合作”的方法,工人会互相等待:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

void the_work(int id)
{
    static int running = 0;
    std::cout << "worker " << id << " entered (" << running << " running)\n";

    static mutex mx;
    static condition_variable cv;

    // synchronize here, waiting until we can begin work
    {
        unique_lock<mutex> lk(mx);
        cv.wait(lk, phoenix::cref(running) < 3);
        running += 1;
    }

    std::cout << "worker " << id << " start work\n";
    this_thread::sleep_for(chrono::seconds(2));
    std::cout << "worker " << id << " done\n";

    // signal one other worker, if waiting
    {
        lock_guard<mutex> lk(mx);
        running -= 1;
        cv.notify_one(); 
    }
}

int main()
{
    thread_group pool;

    for (int i = 0; i < 10; ++i)
        pool.create_thread(bind(the_work, i));

    pool.join_all();
}

现在,我想说让一个由 n 个工人组成的专用池轮流从队列中取出他们的工作可能会更好:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (auto job = q.dequeue())
              (*job)();
      }

  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }

      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(std::move(job));

          cv.notify_one();
      }

      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          auto job = std::move(_queue.front());
          _queue.pop_front();

          return std::move(job);
      }

      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
      }
};

void the_work(int id)
{
    std::cout << "worker " << id << " entered\n";

    // no more synchronization; the pool size determines max concurrency
    std::cout << "worker " << id << " start work\n";
    this_thread::sleep_for(chrono::seconds(2));
    std::cout << "worker " << id << " done\n";
}

int main()
{
    thread_pool pool; // uses 1 thread per core

    for (int i = 0; i < 10; ++i)
        pool.enqueue(bind(the_work, i));
}

PS。如果你愿意,你可以在那里使用 C++11 lambdas 而不是 boost::phoenix。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-08-25
    • 1970-01-01
    • 1970-01-01
    • 2013-08-08
    • 2011-05-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多