【问题标题】:How can i save RDD to a single parquet file?如何将 RDD 保存到单个镶木地板文件中?
【发布时间】:2016-10-04 17:20:43
【问题描述】:

我使用 pyspark 2.0、hadoop 2.7.2。 这是我的代码:

def func(df):
    new_df = pd.DataFrame(df['id'])
    new_df['num'] = new_df['num'] * 12
    return new_df

set = sqlContext.read.parquet("data_set.parquet")
columns = set.columns
map_res = set.rdd.mapPartitions(lambda iter_: func(pd.DataFrame(list(iter_), 
                                                   columns=columns)))

现在,我需要将 ma​​p_res RDD 保存为 parquet 文件 new.parquet。 有什么办法可以在保存之前不创建大型数据框的情况下做到这一点?或者有没有可能单独保存RDD的每个分区,然后合并所有保存的文件?

附:由于它的大小非常大,我想在不创建数据框的情况下进行管理。

【问题讨论】:

  • @santon 似乎需要将所有单个数据帧合并成一个保留模式的大数据帧。将它们保留为 RDD 的元素将不允许像使用 DataFrame 那样对结果进行操作。
  • @ИванСудос 正确,所以我不希望所有数据都移动到一个节点
  • @santon 当您将管道制作为单个镶木地板文件时,因为参数更容易处理

标签: python hadoop apache-spark pyspark rdd


【解决方案1】:

只有两种方法可以做到这一点:

一个是使用"coalesce(1)" 这将确保所有数据都保存到 1 个文件而不是多个文件中(200 是 spark 默认分区数)使用dataframe.write.save("/this/is/path")

另一种选择是将输出写入配置单元表,然后使用hive -e "select * from table" > data.tsv,它将用制表符分隔。

【讨论】:

    【解决方案2】:

    我建议这样做:

    dataframes = []
    #creating index
    map_res = map_res.zipWithIndex()
    # setting index as key
    map_res = map_res.map(lambda x: (x[1],x[0]))
    # creating one spark df per element
    for i in range(0, map_res.count()):
        partial_dataframe_pd  = map_res.lookup(i)
        partial_dataframe = sqlContext.createDataFrame(partial_dataframe_pd)
        dataframes.append(partial_dataframe)
    # concatination
    result_df = dataframes.pop()
    for df in dataframes:
        result_df.union(df)   
    #saving
    result_df.write.parquet("...")
    

    如果您的分区数量较少(2-100),那么它应该工作得相当快。

    【讨论】:

      【解决方案3】:

      要以 Parquet 格式保存文件,您需要将 Rdd 转换为 DataFrame,因为 Parquet 文件总是需要一个模式来处理。

      【讨论】:

        【解决方案4】:

        你可以使用:

        set.coalesce(1).write.parquet("myFile.parquet")
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2017-04-25
          • 2018-05-14
          • 1970-01-01
          • 2020-03-11
          • 2021-07-05
          • 2019-01-08
          • 1970-01-01
          相关资源
          最近更新 更多