【问题标题】:Reading different set of json files same time with python使用python同时读取不同的json文件集
【发布时间】:2021-04-30 07:06:40
【问题描述】:
我有两组文件 b 和 c (JSON)。每个文件的数量通常在 500-1000 之间。现在我正在单独阅读这篇文章。我可以使用多线程同时阅读这些内容吗?我有足够的内存和处理器。
yc=no of c files
yb=no of b files
c_output_transaction_list =[]
for num in range(yc):
c_json_file='./output/d_c_'+str(num)+'.json'
print(c_json_file)
c_transaction_list = json.load(open(c_json_file))['data']['transaction_list']
c_output_transaction_list.extend(c_transaction_list)
df_res_c= pd.DataFrame(c_output_transaction_list)
b_output_transaction_list =[]
for num in range(yb):
b_json_file='./output/d_b_'+str(num)+'.json'
print(b_json_file)
b_transaction_list = json.load(open(b_json_file))['data']['transaction_list']
b_output_transaction_list.extend(b_transaction_list)
df_res_b= pd.DataFrame(b_output_transaction_list)
【问题讨论】:
标签:
python
json
python-3.x
pandas
multithreading
【解决方案1】:
我使用这种方法将数百个文件并行读取到最终数据帧中。在没有您的数据的情况下,您必须验证这是否符合您的要求。阅读多进程帮助文档会有所帮助。我在 linux 上使用相同的代码(aws ec2 读取 s3 文件)和 windows 读取相同的 s3 文件。我发现这样做可以节省大量时间。
import os
import pandas as pd
from multiprocessing import Pool
# you set the number of processors or just take the cpu_count from the os object. playing around with this does make a difference. For me using the max isn't always the fast overall time
num_proc = os.cpu_count()
# define the funciton that creates a dataframe from your file
# note, this is different where you build the list the create a dataframe at the end
def json_parse(c_json_file):
c_transaction_list = json.load(open(c_json_file))['data']['transaction_list']
return pd.DataFrame(c_transaction_list)
# this is multiprocessing function that feeds the file names to the parsing function
# if you don't pass num_proc it defaults to 4
def json_multiprocess(fn_list, num_proc=4):
with Pool(num_proc) as pool:
# I use starmap, you may just be able use map
# if you pass more than the file name, starmap handles zip() very well
r = pool.starmap(json_parse, fn_list, 15)
pool.close()
pool.join()
return r
# build your file list first
yc=no of c files
flist = []
for num in range(yc):
c_json_file='./output/d_c_'+str(num)+'.json'
flist.append(c_json_file)
# get a list of of your intermediate dataframes
dfs = json_multiprocess(flist, num_proc=num_proc)
# concat your dataframe
df_res_c = pd.concat(dfs)
然后对您的下一组文件执行相同的操作...
使用 Aelarion 评论中的示例来帮助构建文件