【问题标题】:How to update a pandas dataframe, from multiple API calls如何从多个 API 调用更新熊猫数据框
【发布时间】:2021-01-15 12:15:14
【问题描述】:

我需要做一个python脚本来

  1. 读取包含列(person_idnameflag)的 csv 文件。该文件有 3000 行。
  2. 基于 csv 文件中的person_id,我需要调用一个通过person_id 的 URL 来执行 GET http://api.myendpoint.intranet/get-data/1234 该 URL 将返回person_id 的一些信息,如下例所示。我需要获取所有租金对象并保存在我的 csv 中。我的输出需要是这样的
import pandas as pd
import requests

ids = pd.read_csv(f"{path}/data.csv", delimiter=';')
person_rents = df = pd.DataFrame([], columns=list('person_id','carId','price','rentStatus'))

for id in ids:
    response = request.get(f'endpoint/{id["person_id"]}')
    json = response.json()
    person_rents.append( [person_id, rent['carId'], rent['price'], rent['rentStatus'] ] )
    pd.read_csv(f"{path}/data.csv", delimiter=';' )
person_id;name;flag;cardId;price;rentStatus
1000;Joseph;1;6638;1000;active
1000;Joseph;1;5566;2000;active

响应示例

{
    "active": false,
    "ctodx": false,
    "rents": [{
            "carId": 6638,
            "price": 1000,
            "rentStatus": "active"
        }, {
            "carId": 5566,
            "price": 2000,
            "rentStatus": "active"
        }
    ],
    "responseCode": "OK",
    "status": [{
            "request": 345,
            "requestStatus": "F"
        }, {
            "requestId": 678,
            "requestStatus": "P"
        }
    ],
    "transaction": false
}
  1. 在 csv 上保存来自响应的附加数据后,我需要使用 URL 上的 carId 从另一个端点获取数据。里程结果必须保存在同一个csv中。 http://api.myendpoint.intranet/get-mileage/6638 http://api.myendpoint.intranet/get-mileage/5566

每次调用的返回都是这样的

{"mileage":1000.0000}
{"mileage":550.0000}

最终输出必须是

person_id;name;flag;cardId;price;rentStatus;mileage
1000;Joseph;1;6638;1000;active;1000.0000
1000;Joseph;1;5566;2000;active;550.0000

有人可以帮我写这个脚本吗? 可以与 pandas 或任何 python 3 库一起使用。

【问题讨论】:

    标签: python python-3.x pandas json-normalize


    【解决方案1】:

    代码说明

    • 使用pd.read_csv 创建数据框df
      • 预计'person_id' 中的所有值都是唯一的。
    • 'person_id' 上使用.apply,呼叫prepare_data
      • prepare_data 期望 'person_id'strint,如类型注释 Union[int, str] 所示
    • API 调用到prepare_data 函数,这将返回一个dict
    • dict'rents' 键转换为带有pd.json_normalize 的数据框。
    • 'carId' 上使用.apply,调用API,并将添加到数据框data 中的'mileage' 作为列提取。
    • 'person_id'添加到data,可用于将dfs合并。
    • pd.Seriess 转换为数据帧,使用pd.concat,然后将mergedfs 转换为person_id
    • 以所需形式保存到带有pd.to_csv 的csv。

    潜在问题

    • 如果有问题,最有可能出现在 call_api 函数中。
    • 只要call_api 返回dict,就像问题中显示的响应一样,其余代码将正常工作以产生所需的输出。
    import pandas as pd
    import requests
    import json
    from typing import Union
    
    def call_api(url: str) -> dict:
        r = requests.get(url)
        return r.json()
    
    def prepare_data(uid: Union[int, str]) -> pd.DataFrame:
        
        d_url = f'http://api.myendpoint.intranet/get-data/{uid}'
        m_url = 'http://api.myendpoint.intranet/get-mileage/'
        
        # get the rent data from the api call
        rents = call_api(d_url)['rents']
        # normalize rents into a dataframe
        data = pd.json_normalize(rents)
        
        # get the mileage data from the api call and add it to data as a column
        data['mileage'] = data.carId.apply(lambda cid: call_api(f'{m_url}{cid}')['mileage'])
        # add person_id as a column to data, which will be used to merge data to df
        data['person_id'] = uid
        
        return data
        
    
    # read data from file
    df = pd.read_csv('file.csv', sep=';')
    
    # call prepare_data
    s = df.person_id.apply(prepare_data)
    
    # s is a Series of DataFrames, which can be combined with pd.concat
    s = pd.concat([v for v in s])
    
    # join df with s, on person_id
    df = df.merge(s, on='person_id')
    
    # save to csv
    df.to_csv('output.csv', sep=';', index=False)
    
    • 如果运行此代码时出现任何错误:
      1. 发表评论,让我知道。
      2. edit 您的问题,并将整个 TraceBack 作为文本粘贴到代码块中。

    示例

    # given the following start dataframe
       person_id    name  flag
    0       1000  Joseph     1
    1        400     Sam     1
    
    # resulting dataframe using the same data for both id 1000 and 400
       person_id    name  flag  carId  price rentStatus  mileage
    0       1000  Joseph     1   6638   1000     active   1000.0
    1       1000  Joseph     1   5566   2000     active   1000.0
    2        400     Sam     1   6638   1000     active   1000.0
    3        400     Sam     1   5566   2000     active   1000.0
    

    【讨论】:

      【解决方案2】:

      有许多不同的方法来实现这一点。其中之一是,就像您在评论中开始的那样:

      • 用 pandas 读取 CSV 文件
      • 为每一行获取 person_id 并建立一个呼叫
      • 然后可以从租金中获取传递的 JSON 响应
      • 然后为每个单独的租赁提取 carId
      • 最终将其收集到 row_list 中
      • row_list 然后通过 pandas 转换回 csv

      没有任何错误处理的非常简单的解决方案可能如下所示:

      from types import SimpleNamespace
      
      import pandas as pd
      import requests
      import json
      
      path = '/some/path/'
      df = pd.read_csv(f'{path}/data.csv', delimiter=';')
      
      rows_list = []
      for _, row in df.iterrows():
          rentCall = f'http://api.myendpoint.intranet/get-data/{row.person_id}'
          print(rentCall)
          response = requests.get(rentCall)
          r = json.loads(response.text, object_hook=lambda d: SimpleNamespace(**d))
          for rent in r.rents:
              mileageCall = f'http://api.myendpoint.intranet/get-mileage/{rent.carId}'
              print(mileageCall)
              response2 = requests.get(mileageCall)
              m = json.loads(response2.text, object_hook=lambda d: SimpleNamespace(**d))
              state = "active" if r.active else "inactive"
              rows_list.append((row['person_id'], row['name'], row['flag'], rent.carId, rent.price, state, m.mileage))
      df = pd.DataFrame(rows_list, columns=('person_id', 'name', 'flag', 'carId', 'price', 'rentStatus', 'mileage'))
      print(df.to_csv(index=False, sep=';'))
      

      【讨论】:

        【解决方案3】:

        通过多处理加速

        您提到您有 3000 行,这意味着您必须进行大量 API 调用。根据连接的不同,这些调用中的每一个都可能需要一段时间。因此,以顺序方式执行此操作可能会太慢。大多数时候,您的程序只会等待来自服务器的响应,而不做任何其他事情。 我们可以通过使用multiprocessing 来提高这种性能。

        我使用来自Trenton his answer 的所有代码,但我替换了以下顺序调用:

        # call prepare_data
        s = df.person_id.apply(prepare_data)
        

        使用并行替代方案:

        from multiprocessing import Pool
        n_processes=20  # Experiment with this to see what works well
        with Pool(n_processes) as p:
          s=p.map(prepare_data, df.person_id)
        

        或者,线程池可能更快,但您必须通过将导入替换为 from multiprocessing.pool import ThreadPool as Pool.

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2016-10-07
          • 1970-01-01
          • 2018-04-15
          • 2023-02-23
          • 2018-02-13
          • 1970-01-01
          相关资源
          最近更新 更多