【问题标题】:Python multithreading using a different setting per thread每个线程使用不同设置的 Python 多线程
【发布时间】:2015-06-25 12:15:05
【问题描述】:

我对 Python 比较陌生,最后一次接触 C 中的线程和进程大约是 7 年前,所以请在您的回复中将我视为新手。

我在 Linux 上使用 Python 2.7.6。

我正在尝试查询(稍后从其中下载)一个在线存档,该存档仅允许每个注册用户进行一个连接,而且速度很慢。它有自己的查询 API,所以我不会深入讨论。我打算在并行线程中执行查询和下载,每个用户帐户一个。 (郑重声明我没有作弊,所有账号都是正版用户!)

accounts = ['user1','pass1','user2','pass2'...]
queries = ['query1','query2','query3',..., 'queryN' ]

numQueries = len(queries) 
numAc = len(accounts)/2

if numQueries < numAc:
  nThreads = numQueries 
else
  nThreads = numAc # most likely situation

# example of function for the query 
def runQuery(user, passw, query):
  # here's the API bit

我看到的每个示例都运行在一个列表上。

所以,我很茫然。如果我们忘记所有关于帐户和约束并且只是运行不同的查询,我可以看到它会如何工作。

如何为每个帐户设置一个线程并遍历查询/下载列表?记住我使用的是 2.7。

我也被线程/进程问题弄得不知所措,因此希望您能清楚地回答。

--- 编辑 - 由于下面评论中的代码不可读,这是我尝试过的:

ulock = thread.allocate_lock()

def runQuery(userQueue, ulock, queryQueue):
    query = queryQueue.get()
    with ulock:
        user = userQueue.popleft()
        userQueue.append(user)
        passw = userQueue.popleft()
        userQueue.append(passw)
    print 'The executed query will use: ' + user + ' ' + passw + ' ' + ' ' + query + '\n'

for t in nThreads:    
    thread.start_new_thread(runQuery, (userQueue, ulock, queryQueue,))

【问题讨论】:

  • 我通常避免使用pass 作为变量名(在runQuery 函数签名中),因为它是Python 中“什么都不做”的关键字。以后你用passw,效果会好很多。
  • 您也可以为每个用户创建一个新的multiprocessing.Process,然后开始上述过程?这是一个很好的解释,从你的角度来看,如果你选择线程或进程,我认为这没有任何区别:*.com/questions/17172878/…

标签: python multithreading python-2.7


【解决方案1】:

如果您发现您想要做的只是将查询分配到您拥有的用户帐户上,我认为您的问题有一个更简单的答案,这样没有两个线程同时使用相同的凭据。

这意味着:将每个查询分配给一个用户帐户(循环访问用户帐户,因为您没有与查询一样多的帐户),然后按用户帐户分组,并让每个线程运行分配给单个用户帐户的所有查询.每个线程接收一组凭据,因此不会出现并发问题。

"""
Distributes a number N of queries over a set of M user accounts
"""
from itertools import izip, cycle, groupby
from threading import Thread


def run_query(account, queries):
    """Run a number of queries under the same account"""
    user = account[0]
    passw = account[1]
    for query in queries:
        print 'The executed query will use: ' + user + ' ' + passw + ' ' + ' ' + query + '\n'


def main():
    """Distributes the queries and then runs them in threads"""
    accounts = [('user1', 'pass1'), ('user2', 'pass2'), ('userM', 'passM')]
    queries = ['query1', 'query2', 'query3', 'queryN']

    assignments = list(izip(cycle(accounts), queries))
    assignments = sorted(assignments, key=lambda (account, query): account)
    # [(('user1', 'pass1'), 'query1'), (('user1', 'pass1'), 'queryN'),
    #  (('user2', 'pass2'), 'query2'),
    #  (('userM', 'passM'), 'query3')]

    for account, assigned in groupby(assignments, lambda (account, query): account):
        queries = [item[1] for item in list(assigned)]
        Thread(target=run_query, args=(account, queries)).start()

if __name__ == '__main__':
    main()

一些注意事项:

  • 我将用户和密码分组为元组;这似乎是有道理的,即使它们在您的代码中分开也不难。
  • 有 N 个查询和 M 个用户,其中 N > M。请注意,循环访问用户帐户会导致查询 N(最后一个)分配给用户 1。
  • 对分配进行排序是groupby 函数的要求。
  • 使用itertools.groupby 可能会很棘手。请注意,结果的assigned 部分是assigments 元素的迭代器,因此每个元素又有一个account, query 元组。重要的部分是这个迭代器将只返回单个帐户的那些元素;我们提取查询部分并在线程上运行。
  • 顺便说一句,我发现threading.Threadthreading.start_new_thread 简单得多。无需join 已启动的线程。
  • 没有Queues,没有互斥体,什么都没有。知道你的钥匙就是一切:)

【讨论】:

  • 理想情况下,如果用户数量超过查询数量,我想尝试处理该数量,但我认为这是次要的。您的建议非常有效,尽管我目前正试图理解这一切。我更喜欢它而不是第一个响应,因为当我来管理下载时,可能有数百个文件,这种方法看起来应该可以很好地处理它们。
  • 如果用户数大于查询数(N cycle 永远不会回头,每个用户都可以收到一个query 将收到一个查询。
  • 很抱歉周五没有回复您,但我现在已经测试了所有选项,并且最喜欢这种方法的灵活性。它也可以很好地扩展到数据的下载。
【解决方案2】:

最简单的方法是(使用您当前的结构)使用旧线程模块:

from thread import start_new_thread

start_new_thread(runQuery,(user1,pass1,query1,))
start_new_thread(runQuery,(user2,pass2,query2,))
start_new_thread(runQuery,(user3,pass3,query3,))

所有这些查询将并行运行,直到它们的函数 runQuery 返回。如果您不需要来自线程的反馈,则无需处理同步。

现在看来您确实需要同步,请这样做:

定义一个查询队列,将所有查询添加到:

from Queue import Queue

queryQueue = Queue()
queryQueue.put(query1)
queryQueue.put(query2)
queryQueue.put(query3)

现在使用对 queryQueue 的引用来启动您的线程:

start_new_thread(runQuery,(user1,pass1,queryQueue,))
start_new_thread(runQuery,(user2,pass2,queryQueue,))
start_new_thread(runQuery,(user3,pass3,queryQueue,))

在你的 run 方法的开头这样做:

def runQuery(user, pass, queryQueue):
    query = queryQueue.get()

队列是任务安全的,这意味着它会为您处理所有必要的同步。

【讨论】:

  • 这听起来是一个很好的起点。我已经稍微编辑了这个问题(参见“queries = line”),因为查询通常比帐户多,所以我需要遍历查询。
  • 意味着添加 - 我可以用一个简单的循环和计数器来做到这一点,但在查询完成之前我不能重用帐户,所以我确实需要在那里进行一些同步
  • 我尝试根据您的建议进行旋转,循环访问用户。但它无法处理所描述的所有情况:用户 > 查询和用户 def runQuery(userQueue, ulock, queryQueue): query = queryQueue.get() with ulock: user = userQueue.popleft() userQueue.append(user) passw = userQueue.popleft() userQueue.append(passw) print user passw query + '\n' thread.start_new_thread(runQuery, (userQueue, ulock, queryQueue,))是这样吗?!
  • 抱歉 - 点击了错误的编辑按钮!! :) 我不知道它是如何让我编辑你的答案的。希望它不会通过同行评审!
  • 抱歉,周末没空。由于 Queue 是线程安全的,因此您不需要使用锁。据我所知,您不能同时在两个线程上使用用户,对吗?在这种情况下,只需将用户附加到其 runQuery() 的末尾。此外,您不应该单独 append()/pop() 用户和密码,而是将它们放入一个元组,然后 append()/pop() 元组。这样您就可以保证用户名和密码保持一致。
最近更新 更多