【问题标题】:How to get concurrent.futures ProcessPoolExecutor work with a dictionary?如何让 concurrent.futures ProcessPoolExecutor 与字典一起工作?
【发布时间】:2021-10-21 21:05:06
【问题描述】:

我在youtube上看了python多处理教程,这里是链接https://www.youtube.com/watch?v=fKl2JW_qrso&t=2316s&ab_channel=CoreySchafer

然后,我尝试在我的代码中应用该方法,这是我在应用多处理之前的代码:

import requests
from bs4 import BeautifulSoup
import pandas as pd
import investpy
from pandas import Timestamp
import json
from pandas.io.json import json_normalize
import time

def getCurrency_data():
    user_agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.37"
    links = {"USD-IDR":"https://www.investing.com/currencies/usd-idr", 
             "USD-JPY":"https://www.investing.com/currencies/usd-jpy",
             "USD-CNY":"https://www.investing.com/currencies/usd-cny", 
             "USD-EUR":"https://www.investing.com/currencies/usd-eur",
             "USD-SGD":"https://www.investing.com/currencies/usd-sgd", 
             "USD-THB":"https://www.investing.com/currencies/usd-thb",
             "USD-MXN":"https://www.investing.com/currencies/usd-mxn", 
             "USD-MYR":"https://www.investing.com/currencies/usd-myr",
             "USD-KRW":"https://www.investing.com/currencies/usd-krw", 
             "USD-INR":"https://www.investing.com/currencies/usd-inr"}
    df_currency= pd.DataFrame(columns =['Currency', 'Current', 'Change', 'Prev. Close', 'Open', '1 Year Change'])
    currency=[]
    current=[]
    change=[]
    prev_close=[]
    open_=[]
    oneyear_change=[]
    for key, value in links.items():
        data = requests.get(value, headers={'User-Agent': user_agent})
        soup = BeautifulSoup(data.content, 'html.parser')
        tags1 = soup.find_all('div', {'class':'top bold inlineblock'})
        span_tag =  []
        for div in tags1:
            spans = div.find_all('span')
            for span in spans:       
                x = span.text
                span_tag.append(x)
        current_tmp = span_tag[0]
        change_tmp = span_tag[1]
        
        current.append(current_tmp)
        change.append(change_tmp)
        currency_tmp = key
        currency.append(currency_tmp)
        
        cur = []
        tags2 = soup.find('div', {'class':'clear overviewDataTable overviewDataTableWithTooltip'})
        for a in tags2.findAll('div', {'class':'first inlineblock'}):
            for b in a.findAll('span', {'class':'float_lang_base_2 bold'}):
                cur.append(b.text)
        prevclose_tmp = cur[0]
        open_tmp = cur[1]
        oneyearchange_tmp = cur[2]
        
        prev_close.append(prevclose_tmp)
        open_.append(open_tmp)
        oneyear_change.append(oneyearchange_tmp)
        
        
    df_currency["Currency"] = currency
    df_currency["Current"] = current
    df_currency["Change"] = change
    df_currency["Prev. Close"] = prev_close
    df_currency["Open"] = open_
    df_currency["1 Year Change"] = oneyear_change
    return(df_currency)

好吧,但我很困惑。在那个视频中,输入是一个列表,而我使用字典......这是应用多处理后的代码:

import requests
from bs4 import BeautifulSoup
import pandas as pd
import investpy
from pandas import Timestamp
import json
from pandas.io.json import json_normalize
import time
import concurrent.futures

t1 = time.perf_counter()

links = {"USD-IDR":"https://www.investing.com/currencies/usd-idr", 
         "USD-JPY":"https://www.investing.com/currencies/usd-jpy",
         "USD-CNY":"https://www.investing.com/currencies/usd-cny", 
         "USD-EUR":"https://www.investing.com/currencies/usd-eur",
         "USD-SGD":"https://www.investing.com/currencies/usd-sgd",
         "USD-THB":"https://www.investing.com/currencies/usd-thb",
         "USD-MXN":"https://www.investing.com/currencies/usd-mxn",
         "USD-MYR":"https://www.investing.com/currencies/usd-myr",
         "USD-KRW":"https://www.investing.com/currencies/usd-krw",
         "USD-INR":"https://www.investing.com/currencies/usd-inr"}

def getCurrency_data(link):
    user_agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.37"
    
    df_currency= pd.DataFrame(columns =['Currency', 'Current', 'Change', 'Prev. Close', 'Open', '1 Year Change'])
    currency=[]
    current=[]
    change=[]
    prev_close=[]
    open_=[]
    oneyear_change=[]
    for key, value in links.items():
        data = requests.get(value, headers={'User-Agent': user_agent})
        soup = BeautifulSoup(data.content, 'html.parser')
        tags1 = soup.find_all('div', {'class':'top bold inlineblock'})
        span_tag =  []
        for div in tags1:
            spans = div.find_all('span')
            for span in spans:       
                x = span.text
                span_tag.append(x)
        current_tmp = span_tag[0]
        change_tmp = span_tag[1]
        
        current.append(current_tmp)
        change.append(change_tmp)
        currency_tmp = key
        currency.append(currency_tmp)
        
        cur = []
        tags2 = soup.find('div', {'class':'clear overviewDataTable overviewDataTableWithTooltip'})
        for a in tags2.findAll('div', {'class':'first inlineblock'}):
            for b in a.findAll('span', {'class':'float_lang_base_2 bold'}):
                cur.append(b.text)
        prevclose_tmp = cur[0]
        open_tmp = cur[1]
        oneyearchange_tmp = cur[2]
        
        prev_close.append(prevclose_tmp)
        open_.append(open_tmp)
        oneyear_change.append(oneyearchange_tmp)
        
        
    df_currency["Currency"] = currency
    df_currency["Current"] = current
    df_currency["Change"] = change
    df_currency["Prev. Close"] = prev_close
    df_currency["Open"] = open_
    df_currency["1 Year Change"] = oneyear_change
    return(df_currency)

with concurrent.futures.ProcessPoolExecutor() as executor:
    executor.map(getCurrency_data, links)

t2 = time.perf_counter()

print(f'Finished in {t2-t1} secondes')

我的代码有问题吗?我已经尝试过了,但没有任何结果。谢谢。

【问题讨论】:

    标签: python multithreading dictionary multiprocessing concurrent.futures


    【解决方案1】:

    这个问题也可以使用多线程而不是多处理来解决,因为在getCurrency_data 中花费的大部分时间都在等待数据从您的requests.get 请求中返回,因此线程之间几乎没有竞争全局解释器锁。但是由于BeautifulSoup 对返回的数据进行了一些 CPU 密集型处理,因此 GIL 总会存在一些争用,这表明:

    (1) 多处理可能会比多线程执行得稍好,但前提是您创建的进程数量与您必须检索的 URL 数量一样多,以反映您的“工作”函数大部分时间都在等待和 (2 ) 您应该使用 requests.Session 实例来检索 URL,因为您的所有 URL 都针对同一个网站,这样做可以提高效率。

    要将您的程序转换为多处理或多线程(尝试两种方式 - 您只需将 ProcessPoolExecutor 更改为 ThreadPoolExecutor,但我发现多处理的性能稍高一些),函数 getCurrency_data 应该只处理一个 URL 并将其检索到的数据返回给主进程。然后是主进程累积所有子进程返回的数据并初始化数据帧:

    import requests
    from bs4 import BeautifulSoup
    import pandas as pd
    #import investpy
    #from pandas import Timestamp
    #import json
    #from pandas.io.json import json_normalize
    import time
    import concurrent.futures
    from functools import partial
    
    def getCurrency_data(session, item):
        key, value = item
    
        data = session.get(value)
        soup = BeautifulSoup(data.content, 'html.parser')
        tags1 = soup.find_all('div', {'class':'top bold inlineblock'})
        span_tag =  []
        for div in tags1:
            spans = div.find_all('span')
            for span in spans:
                x = span.text
                span_tag.append(x)
        current_tmp = span_tag[0]
        change_tmp = span_tag[1]
        currency_tmp = key
    
        cur = []
        tags2 = soup.find('div', {'class':'clear overviewDataTable overviewDataTableWithTooltip'})
        for a in tags2.findAll('div', {'class':'first inlineblock'}):
            for b in a.findAll('span', {'class':'float_lang_base_2 bold'}):
                cur.append(b.text)
        prevclose_tmp = cur[0]
        open_tmp = cur[1]
        oneyearchange_tmp = cur[2]
    
        return currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp
    
    
    def main():
    
        t1 = time.perf_counter()
    
        links = {"USD-IDR":"https://www.investing.com/currencies/usd-idr",
                 "USD-JPY":"https://www.investing.com/currencies/usd-jpy",
                 "USD-CNY":"https://www.investing.com/currencies/usd-cny",
                 "USD-EUR":"https://www.investing.com/currencies/usd-eur",
                 "USD-SGD":"https://www.investing.com/currencies/usd-sgd",
                 "USD-THB":"https://www.investing.com/currencies/usd-thb",
                 "USD-MXN":"https://www.investing.com/currencies/usd-mxn",
                 "USD-MYR":"https://www.investing.com/currencies/usd-myr",
                 "USD-KRW":"https://www.investing.com/currencies/usd-krw",
                 "USD-INR":"https://www.investing.com/currencies/usd-inr"}
    
        with requests.Session() as session:
            user_agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.37"
            session.headers = {'User-Agent': user_agent}
            currency = []
            current = []
            change = []
            prev_close = []
            open_ = []
            oneyear_change = []
            with concurrent.futures.ProcessPoolExecutor(max_workers=len(links)) as executor:
                for return_value in executor.map(partial(getCurrency_data, session), links.items()):
                    currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp = return_value
                    currency.append(currency_tmp)
                    current.append(current_tmp)
                    change.append(change_tmp)
                    prev_close.append(prevclose_tmp)
                    open_.append(open_tmp)
                    oneyear_change.append(oneyearchange_tmp)
            df_currency = pd.DataFrame(columns=['Currency', 'Current', 'Change', 'Prev. Close', 'Open', '1 Year Change'])
            df_currency["Currency"] = currency
            df_currency["Current"] = current
            df_currency["Change"] = change
            df_currency["Prev. Close"] = prev_close
            df_currency["Open"] = open_
            df_currency["1 Year Change"] = oneyear_change
    
        t2 = time.perf_counter()
    
        print(f'Finished in {t2-t1} seconds')
    
        print(df_currency)
    
    # Required for Windows:
    if __name__ == '__main__':
        main()
    

    打印:

    Finished in 4.4468559 seconds
      Currency   Current   Change Prev. Close      Open 1 Year Change
    0  USD-IDR  14,452.5    +52.5      14,400    14,450       - 2.49%
    1  USD-JPY    109.81    +0.09      109.72    109.73         3.47%
    2  USD-CNY    6.5006  +0.0064      6.4942    6.4951       - 6.13%
    3  USD-EUR    0.8564   0.0001      0.8563    0.8565         1.33%
    4  USD-SGD    1.3628  -0.0014      1.3643     1.364       - 0.44%
    5  USD-THB    33.370   +0.020       33.35     33.34         6.54%
    6  USD-MXN   20.3829  +0.2309      20.152    20.152       - 8.88%
    7  USD-MYR    4.2375  +0.0005       4.237    4.2395         1.67%
    8  USD-KRW  1,182.31    +6.03    1,176.28  1,175.46       - 0.69%
    9  USD-INR    74.400   +0.030       74.37     74.38       - 0.62%
    

    两全其美

    由于创建进程有相当多的开销,因此将需要完成的工作分为主要是 I/O(即检索 URL)和主要是 CPU(解析和处理检索到的HTML 文档),前者使用多线程,后者使用多处理。这样一来,您就不会创建比实际需要更多的流程。

    和以前一样,多线程池的大小应该等于需要检索的 URL 的数量(只要该数量不是不合理大;创建数百个线程应该不是问题)并且多处理池大小应最多使用您拥有的 CPU 内核数。因此,我们创建了两个池并将多处理池传递给我们的工作函数,该函数检索 URL,然后将数据提交到多处理池以解析和处理该数据。

    在下面的代码中,我创建的多处理池大小等于我拥有的 物理 核心数 (4),这是我拥有的逻辑核心数的一半,否则如果我没有指定大小,则为默认池大小。要动态确定此值,您可以从 PyPI 存储库安装包 psutil

    import requests
    from bs4 import BeautifulSoup
    import pandas as pd
    #import investpy
    #from pandas import Timestamp
    #import json
    #from pandas.io.json import json_normalize
    import time
    import concurrent.futures
    from functools import partial
    import psutil
    
    def process_data(key, data):
        soup = BeautifulSoup(data, 'html.parser')
        tags1 = soup.find_all('div', {'class':'top bold inlineblock'})
        span_tag =  []
        for div in tags1:
            spans = div.find_all('span')
            for span in spans:
                x = span.text
                span_tag.append(x)
        current_tmp = span_tag[0]
        change_tmp = span_tag[1]
        currency_tmp = key
    
        cur = []
        tags2 = soup.find('div', {'class':'clear overviewDataTable overviewDataTableWithTooltip'})
        for a in tags2.findAll('div', {'class':'first inlineblock'}):
            for b in a.findAll('span', {'class':'float_lang_base_2 bold'}):
                cur.append(b.text)
        prevclose_tmp = cur[0]
        open_tmp = cur[1]
        oneyearchange_tmp = cur[2]
    
        return currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp
    
    
    def getCurrency_data(session, pool_executor, item):
        key, value = item
    
        data = session.get(value)
        f = pool_executor.submit(process_data, key, data.content)
        return f.result()
    
    def main():
    
        t1 = time.perf_counter()
    
        links = {"USD-IDR":"https://www.investing.com/currencies/usd-idr",
                 "USD-JPY":"https://www.investing.com/currencies/usd-jpy",
                 "USD-CNY":"https://www.investing.com/currencies/usd-cny",
                 "USD-EUR":"https://www.investing.com/currencies/usd-eur",
                 "USD-SGD":"https://www.investing.com/currencies/usd-sgd",
                 "USD-THB":"https://www.investing.com/currencies/usd-thb",
                 "USD-MXN":"https://www.investing.com/currencies/usd-mxn",
                 "USD-MYR":"https://www.investing.com/currencies/usd-myr",
                 "USD-KRW":"https://www.investing.com/currencies/usd-krw",
                 "USD-INR":"https://www.investing.com/currencies/usd-inr"}
    
        with requests.Session() as session:
            user_agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.37"
            session.headers = {'User-Agent': user_agent}
            currency = []
            current = []
            change = []
            prev_close = []
            open_ = []
            oneyear_change = []
            with concurrent.futures.ProcessPoolExecutor(psutil.cpu_count(logical=False)) as pool_executor, \
            concurrent.futures.ThreadPoolExecutor(max_workers=len(links)) as executor:
                for return_value in executor.map(partial(getCurrency_data, session, pool_executor), links.items()):
                    currency_tmp, current_tmp, change_tmp, prevclose_tmp, open_tmp, oneyearchange_tmp = return_value
                    currency.append(currency_tmp)
                    current.append(current_tmp)
                    change.append(change_tmp)
                    prev_close.append(prevclose_tmp)
                    open_.append(open_tmp)
                    oneyear_change.append(oneyearchange_tmp)
            df_currency = pd.DataFrame(columns=['Currency', 'Current', 'Change', 'Prev. Close', 'Open', '1 Year Change'])
            df_currency["Currency"] = currency
            df_currency["Current"] = current
            df_currency["Change"] = change
            df_currency["Prev. Close"] = prev_close
            df_currency["Open"] = open_
            df_currency["1 Year Change"] = oneyear_change
    
        t2 = time.perf_counter()
    
        print(f'Finished in {t2-t1} seconds')
    
        print(df_currency)
    
    # Required for Windows:
    if __name__ == '__main__':
        main()
    

    打印:

    Finished in 3.5800665 seconds
      Currency   Current   Change Prev. Close      Open 1 Year Change
    0  USD-IDR  14,452.5    +52.5      14,400    14,450       - 2.49%
    1  USD-JPY    109.81    +0.09      109.72    109.73         3.47%
    2  USD-CNY    6.5015  +0.0073      6.4942    6.4951       - 6.13%
    3  USD-EUR    0.8545  -0.0018      0.8563    0.8565         1.33%
    4  USD-SGD    1.3615  -0.0027      1.3643     1.364       - 0.44%
    5  USD-THB    33.360   +0.010       33.35     33.34         6.54%
    6  USD-MXN   20.4000  +0.2480      20.152    20.152       - 8.88%
    7  USD-MYR    4.2375  +0.0005       4.237    4.2395         1.67%
    8  USD-KRW  1,177.58    +1.30    1,176.28  1,175.46       - 0.69%
    9  USD-INR    74.352   -0.018       74.37     74.38       - 0.62%
    

    【讨论】:

    猜你喜欢
    • 2023-02-02
    • 2017-06-25
    • 2020-12-13
    • 2012-02-02
    • 2011-02-22
    • 2013-03-24
    • 2016-07-22
    • 2011-08-17
    • 2021-12-18
    相关资源
    最近更新 更多