【问题标题】:How to write pyarrow parquet data to s3 bucket?如何将 pyarrow parquet 数据写入 s3 存储桶?
【发布时间】:2020-03-08 03:16:06
【问题描述】:

我创建了一个数据框并使用 pyarrow 将该 df 转换为 parquet 文件(也提到了here):

def convert_df_to_parquet(self,df):
    table = pa.Table.from_pandas(df)
    buf = pa.BufferOutputStream()
    pq.write_table(table, buf)
    return buf

现在我想将上传的内容保存到 s3 存储桶并尝试为 upload_file()everything 我尝试的不同输入参数都不起作用:

s3_client.upload_file(parquet_file, bucket_name, destination_key)#1st
s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file)#2nd
s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file.getvalue())#3rd
s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file.read1())#4th

错误:

 s3_client.put_object(Bucket=bucket_name, Key=destination_key, Body=parquet_file.read1())
  File "pyarrow/io.pxi", line 376, in pyarrow.lib.NativeFile.read1
  File "pyarrow/io.pxi", line 310, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 320, in pyarrow.lib.NativeFile.read
  File "pyarrow/io.pxi", line 155, in pyarrow.lib.NativeFile.get_input_stream
  File "pyarrow/io.pxi", line 170, in pyarrow.lib.NativeFile._assert_readable
OSError: only valid on readonly files

【问题讨论】:

    标签: python amazon-s3 boto3 pyarrow


    【解决方案1】:

    为什么不直接做(Pandas -> S3)并加快速度?

    import awswrangler as wr
    
    wr.pandas.to_parquet(
        dataframe=df,
        path="s3://...",
        dataset=True,
        mode="overwrite",        # Could be append, overwrite or overwrite_partitions
        database="my_database",  # Optional, only with you want it available on Athena/Glue Catalog
        table="my_table",
        partition_cols=["PARTITION_COL_NAME"])
    

    Reference

    【讨论】:

    • 快速说明:自发布以来,AWS 已对参数名称进行了重大更改。这是更新版本: def write_dataframe_to_parquet_in_s3(data, s3_path, database, table, **kwargs): wr.s3.to_parquet( df=data, path=s3_path, dataset=True, mode="overwrite", database=database, table =table, compression="snappy", max_rows_by_file=kwargs.get("max_rows_by_file", None), partition_cols=kwargs.get("partition_cols_list", None))
    【解决方案2】:

    关于 pandas 的一个更烦人的事情是,如果您的令牌在脚本期间过期,那么即使您使用的是 boto3.Session()pd.write_parquet("s3://...") 也会raise PermissionError。为了解决这个问题,我写了自己的pd.to_parquet()

    def to_parquet(df, s3_path):
        """ Assumes path starts with s3:// """
        parts = s3_path[5:].split("/")
        bucket, key = (parts[0], "/".join(parts[1:]))
    
        table = pa.Table.from_pandas(df)
        writer = pa.BufferOutputStream()
        pq.write_table(table, writer)
        body = bytes(writer.getvalue())
    
        session = boto3.Session(<private variables>)
        s3 = session.client("s3")
        s3.put_object(Body=body, Bucket=bucket, Key=key)
    

    祝你好运!

    【讨论】:

      【解决方案3】:

      来自doc

      你应该这样做,

      import boto3
      s3 = boto3.resource('s3')
      s3.meta.client.upload_file('/tmp/'+parquet_file, bucket_name, parquet_file)
      

      【讨论】:

        猜你喜欢
        • 2020-07-29
        • 1970-01-01
        • 2021-11-17
        • 1970-01-01
        • 2019-02-06
        • 2020-08-11
        • 1970-01-01
        • 2011-01-30
        • 1970-01-01
        相关资源
        最近更新 更多