【问题标题】:PySpark Access DataFrame columns at foreachPartition() custom functionPySpark 在 foreachPartition() 自定义函数中访问 DataFrame 列
【发布时间】:2018-05-22 00:11:10
【问题描述】:

我有一个名为“inside”的函数。我想将此功能应用于 pyspark 数据框。为此,我在我创建的数据框上调用“foreachPartition(inside)”方法。 “内部”函数需要数据框的值。

数据框如下所示:

>>> small_df
DataFrame[lon: double, lat: double, t: bigint]

代码如下所示:

def inside(iterator):
    row=iterator
    x=row.lon
    y=row.lat
    i=row.t 
    #do more stuff

small=pliades.iloc[0:20000,:] #take sample of rows from big dataset
small_df=sqlContext.createDataFrame(small) #create dataframe
test=small_df.foreachPartition(inside)

我的问题是:x,y,i 如何分别获取数据帧的第一(lon)、第二(lat)和第三(t)列的值?

我也尝试使用 row.lon、row.select 来处理,将其视为列表但无法获得所需的结果。

【问题讨论】:

    标签: python apache-spark dataframe iterator pyspark


    【解决方案1】:

    foreachRDD[Row] 上运行,每个分区为Iterator[Row]。如果您想列出所有值(由于可能存在内存问题,不推荐使用

    def inside(iterator):
        x, y, i = zip(*iterator)
        ...
        yield ...
    

    一般来说,最好只逐行迭代,而不将所有内容都保存在内存中:

    def inside(iterator):
        for x, y, i in iterator:
            yield ...
    

    你也可以考虑使用pandas_udf

    • 如果函数返回相同数量的值并且只返回一个列,您可以使用标量类型,它采用 pandas.Series 并返回 pandas.Series

      from pyspark.sql.functions import pandas_udf, PandasUDFType
      
      @pandas_udf(schema, PandasUDFType.SCALAR)
      def f(*cols: pandas.Series) -> pandas.Series:
          ...
      
      df.select(f("col1", "col2", ...))
      
    • 采用pandas.DataFrame 并返回具有相同或不同行数的pandas.DataFrame 的分组变体:

      from pyspark.sql.functions import spark_partition_id
      
      
      
      @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
      def g(df: pandas.DataFrame) -> pandas.DataFrame:
          ...
      
      df.groupby(spark_partition_id()).apply(g)
      

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-12-22
      • 1970-01-01
      • 2019-06-09
      • 2017-03-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多