【问题标题】:MultiThreading with a python loop使用 python 循环的多线程
【发布时间】:2016-02-23 19:51:54
【问题描述】:

我试图在处理器的多个线程上运行此 Python 代码,但我找不到如何分配多个线程。我在 Jupyter(以前的 IPython)中使用 python 2.7。 初始代码如下(所有这部分工作完美)。它是一个网络解析器,它接受x 即 my_list 中的一个 url 即一个 url 列表,然后编写一个 CSV(其中 out_string 是一行)。

没有多线程的代码

my_list = ['http://stackoverflow.com/', 'http://google.com']

def main():
    with open('Extract.csv'), 'w') as out_file:
        count_loop = 0
        for x in my_list:
            #================  Get title ==================#
            out_string = ""
            campaign = parseCampaign(x)
            out_string += ';' + str(campaign.getTitle())

            #================ Get Profile ==================#
            if campaign.getTitle() != 'NA':
                creator = parseCreator(campaign.getCreatorUrl())
                out_string += ';' + str(creator.getCreatorProfileLinkUrl())
            else:
                pass
            #================ Write ==================#
            out_string += '\n'
            out_file.write(out_string) 
            count_loop +=1
            print '---- %s on %s ------- ' %(count_loop, len(my_list))

具有多线程但无法工作的代码

from threading import Thread
my_list = ['http://stackoverflow.com/', 'http://google.com']

def main(x):
    with open('Extract.csv'), 'w') as out_file:
        count_loop = 0
        for x in my_list:
            #================  Get title ==================#
            out_string = ""
            campaign = parseCampaign(x)
            out_string += ';' + str(campaign.getTitle())

            #================ Get Profile ==================#
            if campaign.getTitle() != 'NA':
                creator = parseCreator(campaign.getCreatorUrl())
                out_string += ';' + str(creator.getCreatorProfileLinkUrl())
            else:
                pass
            #================ Write ==================#
            out_string += '\n'
            out_file.write(out_string) 
            count_loop +=1
            print '---- %s on %s ------- ' %(count_loop, len(my_list))

for x in my_list:
    t = Thread(target=main, args=(x,))
    t.start()
    t2 = Thread(target=main, args=(x,))
    t2.start()

我找不到实现多个线程来运行这段代码的好方法,而且我有点困惑,因为文档不是很容易理解。单核,这段代码需要2个小时,多线程会节省我很多时间!

【问题讨论】:

  • 循环运行得更快,
  • 这是我的代码的较短版本,实际版本每个循环需要 7 到 10 秒,因为有很多请求(外部 API),所以如果 10s * 12 000 urls 可以显着改善我的处理器的每个核心都被使用,即 core1 = 10s * 3000url s+ core2 = 10s * 3000urls + core3 = 10s * 3000urls + core4 = 10s * 3000urls 同时...
  • 当你说多线程版本不工作时,你是什么意思。是否有错误,或者它只是没有运行得更快?检查这个问题,我不认为 ipython 使用多个内核,即使是线程模块。 stackoverflow.com/a/204150/5889975
  • python 没有针对多线程进行调整,原因有很多。试试multiprocessingasyncio
  • 请告诉我们“不工作”是什么意思。

标签: python multithreading python-2.7


【解决方案1】:

好的,让我们分解你的问题。

首先,您的 main() 方法处理文件的所有输入和输出。当您将 main 与 2 个线程一起使用时,两个线程都会完成相同的工作。您需要一种只处理一个输入并为该输入返回输出的方法。

def process_x(x):
    #================  Get title ==================#
    out_string = ""
    campaign = parseCampaign(x)
    out_string += ';' + str(campaign.getTitle())

    #================ Get Profile ==================#
    if campaign.getTitle() != 'NA':
        creator = parseCreator(campaign.getCreatorUrl())
        out_string += ';' + str(creator.getCreatorProfileLinkUrl())
    else:
        pass
    #================ Write ==================#
    out_string += '\n'
    return out_string

现在您可以在多个线程中调用此方法,并分别获取每个x 的输出。

from threading import Thread
my_list = ['http://stackoverflow.com/', 'http://google.com']
threads = list()
for x in my_list:
    t = Thread(target=process_x, args=(x,))
    t.start()

但问题是这将启动 n 个线程,其中 n 是 my_list 中的元素数。所以,在这里使用multiprocessing.Pool 会更好。所以改为使用

from multiprocessing import Pool
pool = Pool(processes=4)              # start 4 worker processes
result_list = pool.map(process_x, my_list)

result_list 这里将有所有列表的结果。所以现在你可以把它保存在文件中了。

with open('Extract.csv'), 'w') as out_file:
    out_file.writelines(result_list)

【讨论】:

  • Map 阻止执行。异步映射是更好的解决方案,而 applyasync 比两者更好(恕我直言)。
  • @Kostas 你是对的 Pool.map 阻塞,直到它处理所有输入列表。但是在这里,我认为 OP 不会在 applyasync 和将数据保存到文件之间做任何事情,为此我们需要关心 map 方法的阻塞。更多[Pool.applyasync(f, x) for x in my_list]; Pool.close(); Pool.join(); 在阻塞方面基本上等于Pool.map,逻辑上。
【解决方案2】:

嗯……答案是:

为什么要为同一个任务分配两个线程?

是:

让循环跑得更快

(见原帖的cmets)

那么这里有些不对劲。

亲爱的 OP,两个线程都会完全做同样的事情!这意味着第一个线程将与第二个线程完全相同。

您可以执行以下操作:

import multiprocessing

nb_cores = 2  # Put the correct amount

def do_my_process_for(this_argument):
  # Add the actual code
  pass

def main():

  pool = multiprocessing.Pool(processes=nb_cores)

  results_of_processes = [pool.apply_async(
      do_my_process, 
      args=(an_argument, ),
      callback=None
  ) for an_argument in arguments_list]

  pool.close()
  pool.join()

基本上,您可以认为每个进程/线程都有自己的“思想”。这意味着在您的代码中,第一个线程将执行main() 中为参数x 定义的过程(取自您在列表中的迭代),第二个线程将执行相同的任务(main() 中的那个) ) 再次为x

您需要将您的流程制定为具有一组输入参数和一组输出的过程。然后您可以创建多个进程,为每个进程提供所需的输入参数之一,然后该进程将使用适当的参数执行您的主程序。

希望对您有所帮助。另请参阅代码,我想您会理解的。

另见:

多处理映射和异步映射(我现在不记得确切的名称)

功能工具部分

【讨论】:

  • 如果我只有 1 个核心,我还能运行它并产生线程吗?
猜你喜欢
  • 1970-01-01
  • 2013-04-13
  • 2018-08-07
  • 1970-01-01
  • 2011-02-13
  • 1970-01-01
  • 1970-01-01
  • 2017-06-13
  • 1970-01-01
相关资源
最近更新 更多