【问题标题】:Multi-threading In Python to open more than thousand of urls and processing them faster多线程在 Python 中打开数千个 url 并更快地处理它们
【发布时间】:2018-04-01 01:48:06
【问题描述】:

我已经编写了一个python脚本来打开arround 1k url并对其进行处理以获得所需的结果,但似乎即使引入了多线程它的工作速度也很慢,并且在处理了一些url之后,该过程似乎被挂起,我无法确定它是否仍在运行或停止。如何创建多个线程以更快地处理它们。任何帮助将不胜感激。在此先感谢。下面是我的脚本。

import threading
from multiprocessing.pool import ThreadPool
from selenium import webdriver
from selenium.webdriver.phantomjs.service import Service
from selenium.webdriver.common.desired_capabilities import 
DesiredCapabilities
from selenium.webdriver.remote.webdriver import WebDriver as 
RemoteWebDriver
from multiprocessing.dummy import Pool  # This is a thread-based Pool
from multiprocessing import cpu_count
import csv

def fetch_url(url):
    driver = webdriver.PhantomJS()
    driver.get(url)
    html = driver.page_source
    print(html)
    print("'%s\' fetched in %ss" % (url[0], (time.time() - start)))

def thread_task(lock,data_set):
    lock.acquire()
    fetch_url(url)
    lock.release()

if __name__ == "__main__":
    data_set = []
    with open('file.csv', 'r') as csvfile:
        spamreader = csv.reader(csvfile, delimiter=' ', quotechar='|')
        for row in spamreader:
            data_set.append(row)

    lock = threading.Lock()
    # data set will contain a list of 1k urls
    for url in data_set:
        t1 = threading.Thread(target=thread_task, args=(lock,url,))
        # start threads
        t1.start()

        # wait until threads finish their job
        t1.join()

    print("Elapsed Time: %s" % (time.time() - start))

【问题讨论】:

  • 您正在使用一个锁进行所有处理。你为什么要使用锁?
  • 我可能会推荐使用当前执行此操作的 requests_futures
  • 嗨,Alex,我的系统在运行多个线程时变得很慢,因为我们可以在这里看到它为每个 url 创建一个线程,因此我添加了锁。让我知道我需要做任何更改。
  • 嗨 Shailyn,我对 requests_futures 没有任何想法,您能否通过一些示例让我了解更多信息。

标签: python multithreading


【解决方案1】:

您首先通过在开始下一个循环之前等待for url in data_set: 循环中的每个线程完成,然后使用锁来一次只让fetch_url 函数的一个实例运行,从而击败了多线程。您已经导入了ThreadPool,它是完成这项工作的合理工具。这是你如何使用它

import threading
from multiprocessing.pool import ThreadPool
from selenium import webdriver
from selenium.webdriver.phantomjs.service import Service
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
import csv

def fetch_url(url):
    driver = webdriver.PhantomJS()
    driver.get(url)
    html = driver.page_source
    print(html)
    print("'%s\' fetched in %ss" % (url[0], (time.time() - start)))

def thread_task(lock,data_set):
    lock.acquire()
    fetch_url(url)
    lock.release()

if __name__ == "__main__":
    start = time.time()
    with open('file.csv', 'r') as csvfile:
        dataset = list(csv.reader(csvfile, delimiter=' ', quotechar='|'))

    # guess a thread pool size which is a tradeoff of number of cpu cores,
    # expected wait time for i/o and memory size.

    with ThreadPool(20) as pool:
        pool.map(fetch_url, dataset, chunksize=1)

    print("Elapsed Time: %s" % (time.time() - start))

【讨论】:

  • 嗨Tdelaney,谢谢你的帮助和建议,我会试试这段代码,也请你帮我理解一下,ThreadPool(20)它实际上会做什么,假设我有一个数据集在 1000 个 url 中,它将根据数据集创建多少个线程。
  • 它创建了 20 个线程,chunksize 告诉它在完成前一个线程时一次将一个 URL 传递给每个工作者。抓取工具花费大量时间等待网站响应,因此如果实际抓取速度很快,您可以拥有比可用 CPU 内核更多(等待)的工作人员。制作一个动态调整负载的系统更复杂 - 线程池是一种快速而肮脏的方式来完成它。
  • 所以这意味着 20 个 url 将分配给 20 个线程,一旦再次完成,它将分配下一个 20 等等..?
  • 大多数情况下... 1 个 url 将分配给 20 个线程中的每一个,因此 20 个将在进行中。当每个线程完成时,它会抓取下一个 url,因此所有 20 个线程将始终工作。以 20 个为一组进行操作会很糟糕,因为空闲线程必须等待最慢的线程完成。
  • 感谢您的澄清,我现在已经理解了,那么最好的做法是什么,请建议我,因为现在我正在运行您提供的脚本,我发现它更快现在与我之前运行的脚本进行比较。
猜你喜欢
  • 2018-04-11
  • 1970-01-01
  • 2018-01-05
  • 1970-01-01
  • 1970-01-01
  • 2023-03-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多