【问题标题】:How to add column with alternate values in PySpark dataframe?如何在 PySpark 数据框中添加具有备用值的列?
【发布时间】:2019-09-20 23:13:02
【问题描述】:

我有以下示例数据框

df = spark.createDataFrame([('start','end'), ('start1','end1')] ,["start", "end"])

我想分解每行中的值并在生成的行中关联交替的 1-0 值。这样我可以识别每一行中的开始/结束条目。

我可以通过这种方式达到预期的效果

from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = (df.withColumn('start_end', fn.array('start', 'end'))
        .withColumn('date', fn.explode('start_end'))
        .withColumn('row_num', fn.row_number().over(w)))
df = (df.withColumn('is_start', fn.when(fn.col('row_num')%2 == 0, 0).otherwise(1))
        .select('date', 'is_start'))

给了

| date   | is_start |
|--------|----------|
| start  | 1        |
| end    | 0        |
| start1 | 1        |
| end1   | 0        |

但是对于这样一个简单的任务来说似乎过于复杂了。

不使用 UDF 有没有更好/更清洁的方法?

【问题讨论】:

    标签: apache-spark pyspark pyspark-sql


    【解决方案1】:

    您可以将pyspark.sql.functions.posexplodepyspark.sql.functions.array 一起使用。

    首先从您的 startend 列中创建一个数组,然后将其与位置展开:

    from pyspark.sql.functions import array, posexplode
    
    df.select(posexplode(array("end", "start")).alias("is_start", "date")).show()
    #+--------+------+
    #|is_start|  date|
    #+--------+------+
    #|       0|   end|
    #|       1| start|
    #|       0|  end1|
    #|       1|start1|
    #+--------+------+
    

    【讨论】:

      【解决方案2】:

      你可以试试union:

      df = spark.createDataFrame([('start','end'), ('start1','end1')] ,["start", "end"])
      df = df.withColumn('startv', F.lit(1))
      df = df.withColumn('endv', F.lit(0))
      df = df.select(['start', 'startv']).union(df.select(['end', 'endv']))
      df.show()
      
      +------+------+
      | start|startv|
      +------+------+
      | start|     1|
      |start1|     1|
      |   end|     0|
      |  end1|     0|
      +------+------+
      

      您可以从这里开始重命名列并重新排列行。

      【讨论】:

        【解决方案3】:

        我的用例中有类似的情况。在我的情况下,我有巨大的数据集(~50GB)并且进行任何自连接/重转换都会导致更多的内存和不稳定的执行。

        我将数据集再降一级并使用 rdd 的平面图。这将使用 map 端转换,并且在 shuffle、cpu 和内存方面具有成本效益。

        df = spark.createDataFrame([('start','end'), ('start1','end1')] ,["start", "end"])
        df.show()
        +------+----+
        | start| end|
        +------+----+
        | start| end|
        |start1|end1|
        +------+----+
        
        final_df = df.rdd.flatMap(lambda row: [(row.start, 1), (row.end, 0)]).toDF(['date', 'is_start'])
        final_df.show()
        +------+--------+
        |  date|is_start|
        +------+--------+
        | start|       1|
        |   end|       0|
        |start1|       1|
        |  end1|       0|
        +------+--------+
        
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2018-06-16
          • 2021-07-09
          • 2019-04-06
          • 1970-01-01
          • 2022-01-17
          • 1970-01-01
          相关资源
          最近更新 更多