【问题标题】:Read & delete from mongodb with python multiprocessing使用 python 多处理从 mongodb 读取和删除
【发布时间】:2019-10-26 18:00:23
【问题描述】:

我有一个带有集合 incoming 和集合 target 的 mongodb。一个工作进程当前正在执行以下操作(简化):

def worker(number):
    incomings = db.incoming.find()
    buffersize=5
    readcounter=0
    for incoming in incomings:
        readcounter+=1
        documentToInsert={'keyfield':incoming["keyfield"], +other fields after some treatments...}
        documentsToInsert.append(incoming)
        documentToDelete={'_id':incoming["_id"]}
        documentsToDelete.append(documentToDelete)
        if readcounter >= readbuffer:
            readcounter=0
            db.incoming.remove({'_id': { '$in': [ObjectId(docs["_id"]) for docs in documentsToDelete]}})
            db.target.insert_many([docs for docs in documentsToInsert],ordered=False)

当然 remove 和 insert_many 语句都被try/except包围。

由于数据传入的速度比由/一个工作人员处理的速度快,因此我需要变得更快,例如通过在所有 cpu 上生成数据,为了提高效率,无论如何都应该这样做。我正在通过以下代码执行此操作:

if __name__== "__main__":
    procs=[]
    number=0
    for cpu in range(multiprocessing.cpu_count()):
        procs.append(multiprocessing.Process(target = worker, args = (number,)))
        number+=1
    for proc in procs:
        proc.start()
    for proc in procs:
        proc.join()
    print("=====================FIN=========================")

问题是当一个线程正在读取buffersize文档时,其他线程获取相同的文档,导致只有一个线程成功插入target,其他线程产生重复键异常的困境。这种效果只使一个进程有用。如果没有多线程,remove/insert_many 组合工作正常,我可以轻松地使用更高的缓冲区。

我曾考虑使用附加字段将数据插入到 incoming 中,以使工作人员 number 合格,但这会占用额外的磁盘空间并消耗额外的处理,另外,在生成时,我不知道有多少工作人员将处理数据。

我已经尝试在每个线程中随机休眠一个时间,但这是完全不可预测的,并且本身并不能防止错误。

我该怎么做才能让所有线程处理不同的数据?

【问题讨论】:

  • 你考虑过像 RabbitMQ 这样的东西吗?这将允许您将传入消息的速度与处理速度分离。
  • 您的意思是这样的:一个线程只在读取,而正在为其他线程提供要处理的文档?听起来很有趣,特别是因为它只有一个读者而不是很多,这解决了 a) 阅读不同和 b) 为许多读者保留资源。使用 *mq(我更喜欢 0mq),阅读器的数量基本上是无限的。我会试一试。感谢这个想法!

标签: python mongodb multithreading pymongo python-multiprocessing


【解决方案1】:

根据我的评论,我认为使用 RabbitMQ 之类的消息代理最适合您的用例。使用 RabbitMQ 和类似的消息代理(我没有使用 0mq),您不需要提供其他线程,只需启动所需数量的线程,每个订阅,代理将依次传递消息。

【讨论】:

    【解决方案2】:

    感谢 @Belly Buster 提出使用 *MQ 解耦处理的想法。我已经使用 ZeroMQ 解决了这个问题,它是无代理的,但在这种情况下,我已经实现了一个基于 Load balancing broker example for ZeroMQ 的负载平衡代理。客户端正在从数据库中读取数据,而工作人员正在处理他们通过 ZeroMQ 获得的条目。我试图将一些全面的 cmets 放入代码中,以明确几点。该代码缺少一些我编写的实用程序类,它们不属于该解决方案;这段代码只是为了回答这个问题,希望任何人都觉得它有用。

    """
    Original Author: Brandon Carpenter (hashstat) <brandon(dot)carpenter(at)pnnl(dot)gov>
    This code was part of the ZeroMQ Tutorial and implements the Load-balancing broker pattern.
    Modified by @https://stackoverflow.com/users/2226028/michael
    """
    
    from __future__ import print_function
    
    import multiprocessing
    import zmq
    import io
    import pymongo
    from pymongo import MongoClient
    import time
    from pprint import pprint
    import ast
    import json
    from bson.json_util import dumps
    from datetime import datetime
    from PairConfig import PairConfig
    from PairController import PairController
    import ctypes
    import sys
    from random import randint
    
    NBR_CLIENTS = 1
    NBR_WORKERS = 3
    
    # Load the configuration file
    # this is a configuration class which is not documented here
    pairConfig=PairConfig("verify.ini")
    
    # multiprocessing shared variables setup
    manager = multiprocessing.Manager()
    insertbuffer=manager.list()
    deletebuffer=manager.list()
    totalcounter=multiprocessing.Value(ctypes.c_int,0)
    
    def client_task(ident):
        try:
            """Basic request-reply client using REQ socket."""
            client = MongoClient(pairConfig.config.get('db','url'))
            db=client.databasename
            socket = zmq.Context().socket(zmq.REQ)
            socket.identity = u"Client-{}".format(ident).encode("ascii")
            socket.connect("ipc://frontend.ipc")
    
            while True:
                incomings = db.incoming.find()
                # this makes it safe(r) to run this on different nodes
                incomings.skip(randint(randint(1,500),randint(5000,500000)))
                for incoming in incomings:
                    pair = {'primarykey' : incoming["primarykey"], 'value' : incoming["value"]}
                    # Send request, get reply
                    socket.send_string(b"%s" % pair)
                    reply = socket.recv()
        except KeyboardInterrupt:
            print("\nexit client")
    
    def worker_task(ident,insertbuffer,deletebuffer,mylock):
        try:
            """Worker task, using a REQ socket to do load-balancing."""
            socket = zmq.Context().socket(zmq.REQ)
            socket.identity = u"Worker-{}".format(ident).encode("ascii")
            socket.connect("ipc://backend.ipc")
    
            socket.send(b"READY")
    
            # this is a helper class which is not documented here
            pairController=PairController(pairConfig)
            while True:
                address, empty, request = socket.recv_multipart()
                with totalcounter.get_lock():
                    totalcounter.value+=1
                dictToInsert = ast.literal_eval(request.encode("ascii"))
                dictToInsert["last_checked"]=datetime.now()
                insertbuffer.append(dictToInsert)
                deletebuffer.append(dictToInsert["primarykey"])
                # ... do some timely treatment here - a lot of variable time gets burned here ...
                # result will be result1 and result2, for the sake of simplification I will fill it with random numbers here
                result1=randint(1,10)
                result2=randint(1,10)
                sys.stdout.write("%s %s insertbuffer: %d, deletebuffer: %d, totalcounter: %d, b: %s, r: %s            \r" % (socket.identity.decode("ascii"),dictToInsert["primarykey"],len(insertbuffer),len(deletebuffer),totalcounter.value,result1,result2))
                sys.stdout.flush()
                # readbuffer comes from an ini file ... I chose 500 for now
                if len(insertbuffer[:]) >= int(pairConfig.config.get('verify','readbuffer')) and ident==0:
                    mylock.acquire()
                    # these 2 methods are inside a class pairController which is not documented here,
                    # it's basically one method for insert_many() and one method for remove(), 
                    # each time with the respective buffer as a filter
                    pairController.storePairs("history",insertbuffer[:])
                    pairController.deletePairs("history",deletebuffer[:])
                    # this empties the buffers for all filters:
                    insertbuffer[:]=[]
                    deletebuffer[:]=[]
                    mylock.release()
                socket.send_multipart([address, b"", b"ok"])
        except KeyboardInterrupt:
            print("\nexit worker")
    
    def main():
        """Load balancer main loop."""
        # Prepare context and sockets
        context = zmq.Context.instance()
        frontend = context.socket(zmq.ROUTER)
        frontend.bind("ipc://frontend.ipc")
        backend = context.socket(zmq.ROUTER)
        backend.bind("ipc://backend.ipc")
    
        # Start background tasks
        mylock = multiprocessing.Lock()
        def start(task, *args):
            process = multiprocessing.Process(target=task, args=args)
            process.daemon = True
            process.start()
        for i in range(NBR_CLIENTS):
            start(client_task, i)
        for i in range(NBR_WORKERS):
            start(worker_task, i, insertbuffer, deletebuffer, mylock)
    
        # Initialize main loop state
        count = NBR_CLIENTS
        workers = []
        poller = zmq.Poller()
        # Only poll for requests from backend until workers are available
        poller.register(backend, zmq.POLLIN)
    
        while True:
            sockets = dict(poller.poll())
    
            if backend in sockets:
                # Handle worker activity on the backend
                request = backend.recv_multipart()
                worker, empty, client = request[:3]
                if not workers:
                    # Poll for clients now that a worker is available
                    poller.register(frontend, zmq.POLLIN)
                workers.append(worker)
                if client != b"READY" and len(request) > 3:
                    # If client reply, send rest back to frontend
                    empty, reply = request[3:]
                    frontend.send_multipart([client, b"", reply])
                    count -= 1
    
            if frontend in sockets:
                # Get next client request, route to last-used worker
                client, empty, request = frontend.recv_multipart()
                worker = workers.pop(0)
                backend.send_multipart([worker, b"", client, b"", request])
                if not workers:
                    # Don't poll clients if no workers are available
                    poller.unregister(frontend)
    
        # Clean up
        backend.close()
        frontend.close()
        context.term()
    
    if __name__ == "__main__":
        try:
            main()
        except KeyboardInterrupt:
            print("\nexit main")
    

    【讨论】:

      猜你喜欢
      • 2017-08-08
      • 2018-01-13
      • 1970-01-01
      • 1970-01-01
      • 2020-12-03
      • 2019-12-13
      • 1970-01-01
      • 2020-11-16
      • 1970-01-01
      相关资源
      最近更新 更多