【问题标题】:Calling a Python function from multiple threads in C++从 C++ 中的多个线程调用 Python 函数
【发布时间】:2019-07-23 11:03:02
【问题描述】:

找到了几个类似的主题,但没有任何帮助。

基本上,我有一个 C++ 应用程序,它想从 Python 脚本调用一个函数。这一切都很好。

但是因为我需要它在一定程度上实时工作并且 python 函数需要一点时间,所以我想添加多线程。

基本上是两种情况之一:

  1. C++ 中的线程池,每个线程都将调用 Python 函数。

  1. Python 中的线程池,C++ 应用正在将任务添加到 Python 队列中。

我更喜欢第一个选项,所以让我们开始吧。基本上,我有一个带有函数的python脚本(实际上,有一个Tensorflow预测):

import time
def pfoo(msg):
  print("Python >> Function called)
  time.sleep(2)
  print("Python << Function finished)

还有一个 C++ 线程池(大部分取自 here):

ThreadPool pool(4);

// initialize the Python
Py_Initialize();
// initialize thread support
PyEval_InitThreads();

// ...

PyObject* m_PyModule = PyImport_ImportModule( "test" );

PyObject* m_PyDict = PyModule_GetDict( m_PyModule );

PyObject* m_PyFoo = PyDict_GetItemString( m_PyDict, "pfoo" );

for ( int i = 0; i < 10; i++ ) { 

    pool.enqueue( [&] { 

      PyEval_CallObject( m_PyFoo, Py_BuildValue( "(s)", arg ) );

    } );
}

正如您可以想象的那样,什么都没有发生,因为您不能在同一函数仍在运行时调用它。 我尝试了Py_BEGIN_ALLOW_THREADS 宏,我尝试了PyGILState_Ensure()this 更复杂的选项。我没有想法。

尝试了第二种情况,我在 Python 中有一个无限循环线程,它从 queue.Queue() 读取任务并将它们放入 ThreadPoolExecutor,然后 C++ 应用程序调用一个函数将任务添加到上述队列。对我也不起作用(如果我只是在 Python 中运行它就可以,但如果嵌入到 C++ 中就不起作用)。

【问题讨论】:

  • 如果每个线程运行一个单独的 Python 解释器,并且可以调用相同的函数但它们不能相互交互,对您来说可以吗?在 Python 中使用多处理怎么样?
  • @JohnZwinck 我想过,但由于我需要加载一个 Tensorflow 模型并分配几 GB 的 GPU 内存来使用它,这可能很快就会出现问题。

标签: python c++ multithreading


【解决方案1】:

我想我可能已经找到了一个可以接受的解决方案。使用合成示例,但需要使用 Tensorflow 预测进行实时测试:

python 脚本仍然使用线程池,但没有无限线程循环:

from concurrent.futures import ThreadPoolExecutor
import threading
import time

executor = ThreadPoolExecutor(max_workers=4)
lock = threading.Lock()
counter = 0

def worker(msg, n):
  global counter

  with lock:
    counter += 1

  print("Python >> Function called with (%s, %d)" % (msg, n))

  for i in range(n): # do some work...
    print("\tPython :: %s: %d" % (msg, i))
    time.sleep(1)

  print("Python << Finished function for (%s, %d)" % (msg, n))

  with lock:
    counter -= 1
    if counter < 0:
      counter = 0


def add(msg, n):
  global executor, counter
  if counter < 4:
    print("++%d threads are free, adding task: %s" % (4 - counter, msg))
    executor.submit(worker, msg, n)

而 C++ 应用程序只需在必要时调用“add”函数:

std::map<std::string, PyObject*> m_PyFunctions;

m_PyFunctions["add"] = PyDict_GetItemString( m_PyDict, "add" );

for ( int i = 0; i < 100; i++ ) 
{
    std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );

    PyEval_CallObject( m_PyFunctions["add"], 
                       Py_BuildValue( "(s, l)", "task_#" + std::to_string( i ), std::rand() % 5 ) );
}

编辑:是的,用我的主应用程序对此进行了测试,它工作正常。一个粗略的解决方案,但仍然可以通过。

【讨论】:

  • 您从哪里获得 C++ 代码中的 Python GIL?我认为您需要这样做,否则您的代码可能会崩溃。
  • @JohnZwinck 我不...这是奇怪的部分。
  • @JohnZwinck 不确定这个合成示例中的 CPU 利用率,但添加两行简单的代码来输出当前的 thread_id 肯定表明线程池工作和任务在不同的线程中同时执行。主要代码实际上是在 GPU 上运行的机器学习(预测)任务。
  • 非常有趣的例子!您使用哪种 Python?我尝试使用 Python 2.7,似乎线程仅在 add(msg, n) 调用期间处于活动状态。换句话说,如果我在上述函数中添加延迟,我会在那段时间开始看到“Python :: task_#...”...
  • @ArtemBobritsky 那是 Python 3.6,如果有记忆的话......
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2013-05-15
  • 1970-01-01
  • 1970-01-01
  • 2010-10-05
  • 2014-11-10
  • 2017-01-15
  • 1970-01-01
相关资源
最近更新 更多