【问题标题】:How to slice a pyspark dataframe in two row-wise如何以两行方式对 pyspark 数据帧进行切片
【发布时间】:2018-07-30 19:08:24
【问题描述】:

我在 Databricks 工作。

我有一个包含 500 行的数据框,我想创建两个包含 100 行的数据框,另一个包含剩余的 400 行。

+--------------------+----------+
|              userid| eventdate|
+--------------------+----------+
|00518b128fc9459d9...|2017-10-09|
|00976c0b7f2c4c2ca...|2017-12-16|
|00a60fb81aa74f35a...|2017-12-04|
|00f9f7234e2c4bf78...|2017-05-09|
|0146fe6ad7a243c3b...|2017-11-21|
|016567f169c145ddb...|2017-10-16|
|01ccd278777946cb8...|2017-07-05|

我已经尝试了以下但我收到一个错误

df1 = df[:99]
df2 = df[100:499]


TypeError: unexpected item type: <type 'slice'>

【问题讨论】:

    标签: python pyspark spark-dataframe databricks


    【解决方案1】:

    试试这个方法:

    df1_list = df.collect()[:99] #this will return list    
    df1 = spark.createDataFrame(df1) #convert it to spark dataframe
    

    同样如此:

    df2_list = df.collect()[100:499]
    df2 = spark.createDataFrame(df2)
    

    【讨论】:

      【解决方案2】:

      在这里提供一个更简单的解决方案,更类似于所要求的:

      (适用于 Spark 2.4 +)

      # Starting
      print('Starting row count:',df.count())
      print('Starting column count:',len(df.columns))
      
      # Slice rows
      df2 = df.limit(3)
      print('Sliced row count:',df2.count())
      
      # Slice columns
      cols_list = df.columns[0:1]
      df3 = df.select(cols_list)
      print('Sliced column count:',len(df3.columns))
      

      【讨论】:

        【解决方案3】:

        如果我不介意在两个数据框中有相同的行,那么我可以使用sample。例如我有一个包含 354 行的数据框。

        >>> df.count()
        354
        
        >>> df.sample(False,0.5,0).count() //approx. 50%
        179
        
        >>> df.sample(False,0.1,0).count() //approx. 10%
        34
        

        或者,如果我想在不存在重复的情况下严格拆分,我可以这样做

        df1 = df.limit(100)     //100 rows
        df2 = df.subtract(df1)  //Remaining rows
        

        【讨论】:

        【解决方案4】:

        最初我误解并认为您想对列进行切片。如果要选择行的子集,一种方法是使用monotonically_increasing_id() 创建索引列。来自文档:

        生成的 ID 保证单调递增且 唯一的,但不是连续的。

        您可以使用此 ID 对数据框进行排序并使用 limit() 对其进行子集化,以确保您获得所需的行。

        例如:

        import pyspark.sql.functions as f
        import string
        
        # create a dummy df with 500 rows and 2 columns
        N = 500
        numbers = [i%26 for i in range(N)]
        letters = [string.ascii_uppercase[n] for n in numbers]
        
        df = sqlCtx.createDataFrame(
            zip(numbers, letters),
            ('numbers', 'letters')
        )
        
        # add an index column
        df = df.withColumn('index', f.monotonically_increasing_id())
        
        # sort ascending and take first 100 rows for df1
        df1 = df.sort('index').limit(100)
        
        # sort descending and take 400 rows for df2
        df2 = df.sort('index', ascending=False).limit(400)
        

        只是为了验证这是否符合您的要求:

        df1.count()
        #100
        df2.count()
        #400
        

        我们还可以验证索引列不重叠:

        df1.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
        #+---+---+
        #|min|max|
        #+---+---+
        #|  0| 99|
        #+---+---+
        
        df2.select(f.min('index').alias('min'), f.max('index').alias('max')).show()
        #+---+----------+
        #|min|       max|
        #+---+----------+
        #|100|8589934841|
        #+---+----------+
        

        【讨论】:

        • 使用 monotonically_increasing_id() 可以将单调递增的 id 添加到表中,但它不是连续的。所以如果你使用 limit(100),你不会得到 100 行
        • @Gavin 您将得到正好 100 行,因为 limit(n) 将准确返回 n 行(假设表中有那么多行,如果没有,它将返回所有行。)。这仅依赖于monotonically_increasing_id() 对表格进行排序。
        • 知道了。但我不能使用由 monotonically_increasing_id 创建的索引来获取 200 到 300 行。对吗?
        • @Gavin 没错,你不能。但正如我在my answer to your other question 中解释的那样,不建议依赖数据的顺序。在这个例子中,OP 有一个 500 行的 DataFrame——这种技术可能不能很好地概括更大的数据。
        【解决方案5】:

        Spark 数据帧不能像你写的那样被索引。您可以使用 head 方法创建来获取 n 顶行。这将返回 Row() 对象的列表,而不是数据框。因此,您可以将它们转换回数据帧并使用原始数据帧中的减法来获取其余行。

        #Take the 100 top rows convert them to dataframe 
        #Also you need to provide the schema also to avoid errors
        df1 = sqlContext.createDataFrame(df.head(100), df.schema)
        
        #Take the rest of the rows
        df2 = df.subtract(df1)
        

        如果您在 spark 2.0+ 上工作,您也可以使用 SparkSession 而不是 spark sqlContext。此外,如果您对前 100 行不感兴趣并且想要随机拆分,您可以像这样使用randomSplit

        df1,df2 = df.randomSplit([0.20, 0.80],seed=1234)
        

        【讨论】:

          猜你喜欢
          • 2016-05-08
          • 1970-01-01
          • 2019-03-18
          • 2016-01-28
          • 2013-06-20
          • 2021-08-10
          • 1970-01-01
          • 2018-03-29
          • 2023-02-03
          相关资源
          最近更新 更多