【问题标题】:pandas gets stuck when trying to read from bigquery尝试从 bigquery 读取时,熊猫卡住了
【发布时间】:2019-07-07 11:14:55
【问题描述】:

我在大查询中有一个相当大的表(app. 9M 行),我想通过 pandas 读取它。

我尝试阅读和使用[pd.read_gbq()][1] 函数,该函数在小桌子上运行良好。

在大桌子上它在 50 秒左右后卡住(日志显示 elapsed .. 50s) - 没有给出错误或任何东西。

我的问题是如何使用 pd(块?)读取该表。任何有关扩大这些 bigquery 读取的约定都会有所帮助。

编辑/分辨率

添加到 Khan 的答案,我最终实现了块,每次将 500,000 写入文件,然后将这些文件读取到数据框,如下所示:

def download_gbq_table(self):
    if not os.path.exists(self.tmp_dir):
        os.makedirs(self.tmp_dir)
    increment = 100000

    intervals = list(range(0, self.table_size, 100000))
    intervals.append(self.table_size - intervals[len(intervals)-1])

    df = pd.DataFrame()

    for offset in intervals:
        query = f"select * from `<table_name>` limit {increment} offset {offset};"
        logger.info(f"running query: {query}")
        start_time = time.time()
        tmp_df = pd.read_gbq(query,
                       project_id=self.connection_parameters['project_id'],
                       private_key=self.connection_parameters['service_account'],
                       dialect='standard'
                        )
        df = pd.concat([df, tmp_df])
        logger.info(f'time took: {str(round(time.time() - start_time, 2))}')
        if len(df) % 500000 == 0:
            df.to_csv(os.path.join(self.tmp_dir, f'df_{str(offset + increment)}.csv'))
            df = pd.DataFrame()

def read_df_from_multi_csv(self):
    all_files = glob.glob(os.path.join(self.tmp_dir, "df_*"))
    df_list = []

    for f in all_files:
        start_time = time.time()
        df_list.append(pd.read_csv(f))
        logger.info(f'time took for reading {f}: {str(round(time.time() - start_time, 2))}')

    return pd.concat((pd.read_csv(f) for f in all_files))

【问题讨论】:

    标签: pandas google-bigquery


    【解决方案1】:

    Pandas 的 read_gbq 函数目前不提供 chunksize 参数(尽管其相反的 to_gbq 函数确实提供了 chunksize 参数)。

    无论如何,您可以通过在 SQL 查询中添加 LIMITOFFSET 来解决您的问题,从 BigQuery 中迭代读取内容。大概是这样的:

    project_id = "xxxxxxxx"
    
    increment=100000
    chunks=range(0, 9000000, 100000)
    
    chunks[-1]+=increment 
    intervals=[[chunks[i-1], chunks[i]+1] for i, e in enumerate(chunks) if i > 0]
    
    query_str="select * from `mydataset.mytable` limit {end} offset {start};"
    
    for start, end in intervals:
       query = query_str.format(start=start, end=end)
       df = pd.read_gbq(query, project_id)
       #-- do stuff with your df here..
    

    【讨论】:

    • 为什么这条线是:chunks[-1]+=increment?
    • 只是为了确保范围是全包的(并且不会因为步长而后退)。
    • 如果对您有帮助,请将答案标记为正确。谢谢:)
    猜你喜欢
    • 2019-09-27
    • 1970-01-01
    • 1970-01-01
    • 2022-01-10
    • 1970-01-01
    • 1970-01-01
    • 2012-02-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多