【发布时间】:2018-09-02 04:17:51
【问题描述】:
我编写了一个程序,用于对在多线程批量写入条件下执行的 mongodb 数据库进行基准测试。
问题是程序挂起并且没有完成执行。
我很确定问题是由于将 530838 条记录写入数据库并使用 10 个线程一次批量写入 50 条记录。这留下了 38 条记录的模值,但是 run 方法从队列中获取 50 条记录,因此当写入 530800 条记录时进程挂起并且永远不会写入最后的 38 条记录,因为以下代码永远不会完成执行
for object in range(50):
objects.append(self.queue.get())
我希望程序一次写入 50 条记录,直到剩余的记录少于 50 条,此时它应该将剩余的记录写入队列,然后在队列中没有记录时退出线程。
提前谢谢:)
import threading
import Queue
import json
from pymongo import MongoClient, InsertOne
import datetime
#Set the number of threads
n_thread = 10
#Create the queue
queue = Queue.Queue()
#Connect to the database
client = MongoClient("mongodb://mydatabase.com")
db = client.threads
class ThreadClass(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
#Assign thread working with queue
self.queue = queue
def run(self):
while True:
objects = []
#Get next 50 objects from queue
for object in range(50):
objects.append(self.queue.get())
#Insert the queued objects into the database
db.threads.insert_many(objects)
#signals to queue job is done
self.queue.task_done()
#Create number of processes
threads = []
for i in range(n_thread):
t = ThreadClass(queue)
t.setDaemon(True)
#Start thread
t.start()
#Start timer
starttime = datetime.datetime.now()
#Read json object by object
content = json.load(open("data.txt","r"))
for jsonobj in content:
#Put object into queue
queue.put(jsonobj)
#wait on the queue until everything has been processed
queue.join()
for t in threads:
t.join()
#Print the total execution time
endtime = datetime.datetime.now()
duration = endtime-starttime
print(divmod(duration.days * 86400 + duration.seconds, 60))
【问题讨论】:
标签: python multithreading python-3.x python-2.7 python-multithreading