【问题标题】:Retrieve API data into dataframe using multi threading module使用多线程模块将 API 数据检索到数据帧中
【发布时间】:2020-10-14 07:18:16
【问题描述】:

我正在使用第三方 API 从大量天数中检索不同标签的 10 分钟数据。当前的数据提取可能需要几分钟时间,具体取决于天数和标签数量。因此,我正在尝试多线程,我知道这对于繁重的 IO 操作很有用。

API 调用如下(我已经替换了实际的 API 名称):

import numpy as N 
import requests as r 
import json 
import pandas as pd
from datetime import datetime 
import concurrent.futures

  
class pyGeneric: 
  
    def __init__(self, serverName, apiKey, rootApiUrl='/Generic.Services/api'): 
        """ 
        Initialize a connection to server, and return a pyGeneric server object 
        """ 
        self.baseUrl = serverName + rootApiUrl 
        self.apiKey = apiKey 
        self.bearer = 'Bearer ' + apiKey 
        self.header = {'mediaType':'application/json','Authorization':self.bearer} 
  
    def getRawMeasurementsJson(self, tag, start, end):
        apiQuery = '/measurements/' + tag + '/from/' + start + '/to/' + end + '?format=json' 
        dataresponse = r.get(self.baseUrl+apiQuery, headers=self.header) 
        data = json.loads(dataresponse.text) 
        return data 
                                                               
                                
    def getAggregatesPandas(self, tags, start, end):
        """        
        Return tag(s) in a pandas dataFrame
        """
        df = pd.DataFrame()
        if type(tags) == str:
            tags = [tags]
        for tag in tags:
            tempJson =  self.getRawMeasurementsJson(tag, start, end)
            tempDf = pd.DataFrame(tempJson['timeSeriesList'][0]['timeSeries'])
            name = tempJson['timeSeriesList'][0]['measurementName']
            df['TimeUtc'] = [datetime.fromtimestamp(i/1000) for i in tempDf['t']]
            df['TimeUtc'] = df['TimeUtc'].dt.round('min')
            df[name] = tempDf['v']
        return df
    

gener = pyGeneric('https://api.generic.com', 'auth_keymlkj9789878686')

对 API 的调用示例如下: gener_df = gener.getAggregatesPandas('tag1.10m.SQL', '*-10d', '*')

这适用于单个标签,但对于列表,这需要更长的时间,这就是我一直在尝试以下方法的原因:

tags = ['tag1.10m.SQL',
'tag2.10m.SQL',
'tag3.10m.SQL',
'tag4.10m.SQL',
'tag5.10m.SQL',
'tag6.10m.SQL',
'tag7.10m.SQL',
'tag8.10m.SQL',
'tag9.10m.SQL',
'tag10.10m.SQL']

startdate = "*-150d"
enddate = '*'

final_df = pd.DataFrame

with concurrent.futures.ThreadPoolExecutor() as executor:
    args = ((i,startdate, enddate) for i in tags)
    executor.map(lambda p: gener.getAggregatesPandas(*p), args)

但是我无法检查 gener.getAggregatesPandas 是否正确执行。最终,我想在一个名为 final_df 的数据框中获得结果,但也不确定如何进行。我在post 中读到,在上下文管理器中附加会导致数据帧的二次副本,因此最终会减慢速度。

【问题讨论】:

    标签: python pandas concurrent.futures


    【解决方案1】:

    你可以试试下面的方法,只要服务器也能处理,它很容易让你并行发出很多请求;

    # it's just a wrapper around concurrent.futures ThreadPoolExecutor with a nice tqdm progress bar!
    from tqdm.contrib.concurrent import thread_map, process_map # for multi-threading, multi-processing respectively)
    
    def chunk_list(lst, size):
        """
        From SO only; 
        Yield successive n-sized chunks from list.
        """
        for i in range(0, len(lst), size):
            yield lst[i:i + size]
    
    for idx, my_chunk in enumerate(chunk_list(huge_list, size=2**12)):
        for response in thread_map(<which_func_to_call>, my_chunk, max_workers=your_cpu_cores+6)):
            # which_func_to_call -> wrap the returned response json obj in this, etc
            # do something with the response now..
            # make sure to cache the chunk results as well
    

    编辑 1:

    from functools import partial
    startdate = "*-150d"
    enddate = '*'
    my_new_func = partial(which_func_to_call, startdate=startdate, enddate=enddate)
    

    现在我们可以使用这个函数了; 注意 -> my_new_func 现在接受单个参数..

    编辑 2:

    对于缓存,我建议使用 csv 模块并将您想要的响应写入 csv 文件而不是使用 pandas 等;或者您可以根据需要转储 JSON 响应等;类似 JSON/dict 的响应的示例代码如下所示,

    import csv
    import os
    
    with open(OUTPUT_FILE_NAME, "a+", newline="") as csvfile:
        # fieldnames = [your_headers_list]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        # Make sure you write the header only once as we are opening the file in append mode (writer.writeheader())
        for idx, my_chunk in enumerate(chunk_list(<huge_list>, size=CHUNK_SIZE)):
                for response in thread_map(
                    <my_partial_wrapped_func>, my_chunk, max_workers=min(32, os.cpu_count() + 6)
                ):
                # .......
                # .......
                writer.writerow(<row_of_the_csv_as_a_dict_with_fieldnames_as_keys>)
    

    【讨论】:

    • 感谢您的回复和 tqdm 库,我不知道!我不完全确定如何使您的代码适应我的问题。我了解 chunk_list 是一个将处理输入列表的生成器。但是 实际上有三个参数,我不知道如何在这里导入它们。最后我应该如何缓存块结果?
    • @amphinomos 如果您的start_dateenddate 是常量,那么您也可以使用functools.partials 创建一个伪函数并使用它代替&lt; which_function_to_call&gt;;我已经更新了答案,看看吧!
    • 我再次感谢编写代码的努力,我知道它更适合并行处理而不是多线程。老实说,它在我头上飞过,我无法让它与我现有的代码一起工作。
    【解决方案2】:

    据我了解,您需要了解 getAggregatesPandas 是否正确执行。

    你可以像下面那样做。

    with concurrent.futures.ThreadPoolExecutor() as executor:
        args = ((i,startdate, enddate) for i in tags)
        results = executor.map(lambda p: gener.getAggregatesPandas(*p), args)
        for result in results:
            final_df.append(result,ignore_index=False)
        #another approach is below
        #for f in concurrent.futures.as_completed(results):
        #     final_df.append(result,ignore_index=False)
    

    参考视频:-video

    【讨论】:

    • 我能够使用:final_df = pd.DataFrame(columns=['TimeUtc']) 在上下文管理器之前完成这项工作,然后 final_df = pd.merge(final_df, result_df, on = 'TimeUtc ', how = 'outer') 在 for 循环中。
    • @amphinomos 这正是您想要做的,但这将导致您已经知道的二次复制;我想我误读了您的要求;重新阅读您的问题时,此答案符合您的要求;与其使用 pandas 附加结果,不如将它们移动到列表中,然后只调用一次 df.append;我分享的答案将帮助您更快地向 API 发出 // 请求,然后将它们写入 csv 文件而不是使用 pandas;因为慢的不是数据帧的合并,可能是响应速度;这就是我的答案
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-01-12
    • 1970-01-01
    • 2023-03-06
    • 2020-11-28
    • 1970-01-01
    • 1970-01-01
    • 2016-09-15
    相关资源
    最近更新 更多