【问题标题】:Manage Python Multiprocessing with MongoDB使用 MongoDB 管理 Python 多处理
【发布时间】:2018-01-13 19:19:47
【问题描述】:

我正在尝试使用多处理函数运行我的代码,但 mongo 不断返回

"MongoClient 在 fork 之前打开。创建 MongoClient 用 connect=False,或者fork后创建客户端。”

我真的不明白如何使我的代码适应这种情况。 基本上结构是:

db = MongoClient().database
db.authenticate('user', 'password', mechanism='SCRAM-SHA-1')
collectionW = db['words']
collectionT = db['sinMemo']
collectionL = db['sinLogic']


def findW(word):
    rows = collectionw.find({"word": word})
    ind = 0
    for row in rows:
        ind += 1
        id = row["_id"]

    if ind == 0:
        a = ind
    else:
        a = id
    return a



def trainAI(stri):
...
      if findW(word) == 0:

                _id = db['words'].insert(
                    {"_id": getNextSequence(db.counters, "nodeid"), "word": word})
                story = _id
            else:
                story = findW(word)
...


def train(index):
    # searching progress
    progFile = "./train/progress{0}.txt".format(index)
    trainFile = "./train/small_file_{0}".format(index)
    if os.path.exists(progFile):
        f = open(progFile, "r")
        ind = f.read().strip()
        if ind != "":

            pprint(ind)
            i = int(ind)
        else:
            pprint("No progress saved or progress lost!")
            i = 0
        f.close()

    else:
        i = 0
    #get the number of line of the file    
    rangeC = rawbigcount(trainFile)

    #fix unicode
    non_bmp_map = dict.fromkeys(range(0x10000, sys.maxunicode + 1), 0xfffd)
    files = io.open(trainFile, "r", encoding="utf8")
    str1 = ""
    str2 = ""

    filex = open(progFile, "w")

    with progressbar.ProgressBar(max_value=rangeC) as bar:
        for line in files:
            line = line.replace("\n", "")
            if i % 2 == 0:
                str1 = line.translate(non_bmp_map)
            else:
                str2 = line.translate(non_bmp_map)

            bar.update(i)
            trainAI(str1 + " " + str2)
            filex.seek(0)
            filex.truncate()
            filex.write(str(i))
            i += 1

#multiprocessing function

maxProcess = 3

def f(l, i):
    l.acquire()
    train(i + 1)
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(maxProcess):
        pprint("start " + str(num))
        Process(target=f, args=(lock, num)).start()

此代码用于在 4 个不同的进程中读取 4 个不同的文件,同时将数据插入数据库中。 我只复制了部分代码,让你了解它的结构。

我已尝试将 connect=False 添加到此代码中,但没有...

  db = MongoClient(connect=False).database
  db.authenticate('user', 'password', mechanism='SCRAM-SHA-1')
  collectionW = db['words']
  collectionT = db['sinMemo']
  collectionL = db['sinLogic']

然后我尝试在 f 函数中移动它(就在 train() 之前,但我得到的是程序找不到 collectionW、collectionT 和 collectionL。

我不是python或mongodb的专家,所以我希望这不是一个愚蠢的问题。

代码在 Ubuntu 16.04.2 和 python 2.7.12 下运行

【问题讨论】:

  • 这并不是一个新话题,因为数据库连接的“线程安全”的一般概念已经存在了很长时间。可能是为什么错误消息如此具有描述性和精确性。您被告知仅在fork 之后才建立连接,以便连接 存在于工作进程中。如果你想要某种类型的 IPC 那么你使用别的东西来做到这一点。但是在进程/线程之间复制数据库句柄是“正确的”,并且已经很长时间了。

标签: python mongodb python-2.7 pymongo python-multiprocessing


【解决方案1】:

db.authenticate 必须连接到 mongo 服务器,它会尝试建立连接。因此,即使使用了 connect=False,db.authenticate 也需要打开连接。 为什么不在 fork 之后创建 mongo 客户端实例?这看起来是最简单的解决方案。

【讨论】:

  • 但是我到底该怎么做呢?这就是我不明白的地方
  • 您可以在目标函数 f 中创建 mongo 客户端。或者将创建 mongo 客户端的代码放在 f 将调用的函数中。
  • 我已经这样做了,但是正如我在问题中所写的那样,我不知道如何处理需要变量 collectionW、collectionT 和 collectionL 的函数。
【解决方案2】:

由于db.authenticate 必须打开 MongoClient 并连接到服务器,它创建的连接在分叉的子进程中不起作用。因此,错误消息。试试这个:

db = MongoClient('mongodb://user:password@localhost', connect=False).database

另外,删除锁定l。在一个子进程中获取锁对其他子进程没有影响。

【讨论】:

  • 他从主进程创建了锁。因此,它将同步子进程。但是同步子进程在这里看起来并不合适,因为它就像一种大锁一样工作并且违背了使用子进程的目的。
  • 在一个子进程中获取锁对其他子进程没有影响。 =)
  • 实际上多处理锁(不是线程公开的锁)由信号量(github.com/python/cpython/tree/master/Modules/_multiprocessing)支持,因此它们可用于同步不同的进程。因此,如果其他进程也尝试获取相同的锁,获取多进程锁将影响它们。这是一个演示相同的要点gist.github.com/nipuntalukdar/5562d5efd6970495e7632399057d5e99
  • 哦,假设提问者使用的是多进程锁,那么你是对的,它有效果,这违背了使用子进程的目的。
【解决方案3】:

这是我解决问题的方法:

import pathos.pools as pp
import time
import db_access

class MultiprocessingTest(object):

    def __init__(self):
        pass

    def test_mp(self):
        data = [[form,'form_number','client_id'] for form in range(5000)]

        pool = pp.ProcessPool(4)
        pool.map(db_access.insertData, data)

if __name__ == '__main__':

    time_i = time.time()

    mp = MultiprocessingTest()
    mp.test_mp()

    time_f = time.time()

    print 'Time Taken: ', time_f - time_i

这里是 db_access.py:

from pymongo import MongoClient

def insertData(form):
    client = MongoClient()
    db = client['TEST_001']
    db.initialization.insert({
        "form": form[0],
        "form_number": form[1],
        "client_id": form[2]
    })

这发生在您的代码上,因为您为所有子进程启动了一次 MongoCLient()。 MongoClient 不是分叉安全的。因此,在每个函数内部启动都可以,如果有其他解决方案,请告诉我。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-12-17
    • 2013-07-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-08
    相关资源
    最近更新 更多