【问题标题】:Spark Scala Split dataframe into equal number of rowsSpark Scala 将数据帧拆分为相同数量的行
【发布时间】:2017-05-23 12:59:49
【问题描述】:

我有一个数据框并希望将其分成相等数量的行。

换句话说,我想要一个数据帧列表,其中每个数据帧都是原始数据帧的一个不相交的子集。

假设输入数据帧如下:

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  100|[15.5097750043269...|
  |15.509775004326936|          0|  101|[15.5097750043269...|
  |15.509775004326936|          0|  102|[15.5097750043269...|
  |15.509775004326936|          0|  103|[15.5097750043269...|
  |15.509775004326936|          0|  104|[15.5097750043269...|
  |15.509775004326936|          0|  105|[15.5097750043269...|
  |15.509775004326936|          0|  106|[15.5097750043269...|
  |15.509775004326936|          0|  107|[15.5097750043269...|
  |15.509775004326936|          0|  108|[15.5097750043269...|
  |15.509775004326936|          0|  109|[15.5097750043269...|
  |15.509775004326936|          0|  110|[15.5097750043269...|
  |15.509775004326936|          0|  111|[15.5097750043269...|
  |15.509775004326936|          0|  112|[15.5097750043269...|
  |15.509775004326936|          0|  113|[15.5097750043269...|
  |15.509775004326936|          0|  114|[15.5097750043269...|
  |15.509775004326936|          0|  115|[15.5097750043269...|
  | 43.01955000865387|          0|  116|[43.0195500086538...|
  +------------------+-----------+-----+--------------------+

我希望将其拆分为 K 个大小相等的数据帧。如果 k = 4,那么可能的结果是:

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  106|[15.5097750043269...|
  |15.509775004326936|          0|  107|[15.5097750043269...|
  |15.509775004326936|          0|  110|[15.5097750043269...|
  |15.509775004326936|          0|  111|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  104|[15.5097750043269...|
  |15.509775004326936|          0|  108|[15.5097750043269...|
  |15.509775004326936|          0|  112|[15.5097750043269...|
  |15.509775004326936|          0|  114|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+


  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  100|[15.5097750043269...|
  |15.509775004326936|          0|  105|[15.5097750043269...|
  |15.509775004326936|          0|  109|[15.5097750043269...|
  |15.509775004326936|          0|  115|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+


  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  101|[15.5097750043269...|
  |15.509775004326936|          0|  102|[15.5097750043269...|
  |15.509775004326936|          0|  103|[15.5097750043269...|
  |15.509775004326936|          0|  113|[15.5097750043269...|
  | 43.01955000865387|          0|  116|[43.0195500086538...|
  +------------------+-----------+-----+--------------------+

【问题讨论】:

  • 类似于RDDs 在单个转换中没有拆分操作。最简单的方法是将COUNTLIMITOFFSET 结合使用一次...但是,由于Spark 中也没有OFFSET,您可以通过滑动使用解决方法,请参阅stackoverflow.com/questions/31685714/…
  • 输入和输出示例将让您快速得到答案。所以相应地更新你的问题
  • .randomSplit() 方法是否是您在此处提出的要求:stackoverflow.com/questions/43567164/…
  • @SteffenSchmitz randomSplit 给出了不平衡的分布。
  • @RameshMaharjan 我根据需要添加了输入和输出。

标签: scala apache-spark dataframe


【解决方案1】:

另一种解决方案是使用limit和except。以下程序将返回一个包含相同行数的 Dataframe 的数组。除了可能包含较少行的第一个。

var numberOfNew = 4
var input = List(1,2,3,4,5,6,7,8,9).toDF
var newFrames = 0 to numberOfNew map (_ => Seq.empty[Int].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt

while (size > 0) {
    newFrames(numberOfNew) = input.limit(limit)
    input = input.except(newFrames(numberOfNew))
    size = size - limit
    numberOfNew = numberOfNew - 1
}

newFrames.foreach(_.show)

+-----+
|value|
+-----+
|    7|
+-----+

+-----+
|value|
+-----+
|    4|
|    8|
+-----+

+-----+
|value|
+-----+
|    5|
|    9|
+-----+

...

【讨论】:

  • 它保持秩序吗?
  • 我希望它保持秩序,但我不完全确定。
  • 我必须导入什么才能使 frames() 工作?
  • @Eva 它应该是 newFrames 而不是 frames。我更新了 sn-p。
  • 这个算法有问题,它并不总是有效。
【解决方案2】:

根据我对您的输入和所需输出的理解,您可以通过grouping 创建row numbers dataframeone groupId

然后您可以根据您的需要在其他地方比较filterdataframerow numberstoring

以下是满足您需求的临时解决方案。你可以根据自己的需要改变

val k = 4

val windowSpec = Window.partitionBy("grouped").orderBy("original_dt")

val newDF = dataFrame.withColumn("grouped", lit("grouping"))

var latestDF = newDF.withColumn("row", row_number() over windowSpec)

val totalCount = latestDF.count()
var lowLimit = 0
var highLimit = lowLimit + k

while(lowLimit < totalCount){
  latestDF.where(s"row <= ${highLimit} and row > ${lowLimit}").show(false)
  lowLimit = lowLimit + k
  highLimit = highLimit + k
}

我希望这会给你一个好的开始。

【讨论】:

  • 我在处理这段代码时遇到了一些问题。我以前没有使用过窗口函数。另外,是否需要有 HiveContext?
  • 到底是什么问题?我还没有使用过 HiveContext 但据我了解 HiveContext 用于创建环境(读取、写入和配置设置)。其余的工作是在每个上下文(sqlContext、sparkContext 或 HiveContext)中都相同的转换。以上解决方案仅是所有转换。
  • 我收到此错误。 “无法解析窗口函数‘row_number’。请注意,当前使用窗口函数需要 HiveContext;”以及以下导入:“import org.apache.spark.sql.functions.row_number”
  • 那是因为你还没有导入 spark sql 库import org.apache.spark.sql.functions._
  • 抱歉,我忘记提及我使用的是 Spark 1.6。从这篇文章:stackoverflow.com/questions/40319126/… 看来我确实需要 HiveContext
【解决方案3】:

这是对 Steffen Schmitz 的改进答案,实际上是不正确的。我为后代改进了它并对其进行了推广。不过,我确实想知道大规模的性能。

var numberOfNew = 4  
var input = Seq((1,2),(3,4),(5,6),(7,8),(9,10),(11,12)).toDF
var newFrames = 0 to numberOfNew-1 map (_ => Seq.empty[(Int, Int)].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt
val limit_fract = (size / numberOfNew.toFloat)
val residual = ((limit_fract.toDouble - limit.toDouble) * size).toInt
var limit_to_use = limit

while (numberOfNew > 0) {

    if (numberOfNew == 1 && residual != 0) limit_to_use = residual  

    newFrames(numberOfNew-1) = input.limit(limit_to_use)
    input = input.except(newFrames(numberOfNew-1))
    size = size - limit
    numberOfNew = numberOfNew - 1
}

newFrames.foreach(_.show)

val singleDF = newFrames.reduce(_ union _)
singleDF.show(false)

返回单个数据帧:

+---+---+
| _1| _2|
+---+---+
|  7|  8|
|  3|  4|
| 11| 12|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  5|  6|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  9| 10|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  1|  2|
+---+---+

【讨论】:

    【解决方案4】:

    如果你想把一个数据集分成n个相等的数据集

    double[] arraySplit = {1,1,1...,n}; //you can also divide into ratio if you change the numbers.
    
    List<Dataset<String>> datasetList = dataset.randomSplitAsList(arraySplit,1);
    

    【讨论】:

      【解决方案5】:

      不知道这是否比其他选项更有效,但我认为它至少看起来更漂亮:

      import spark.implicits._
      import org.apache.spark.sql.functions._
      
      val df = Seq(1,2,3,4,5,6,7,8,9,0).toDF
      val split_count = 4
      val to_be_split = df.withColumn("split", monotonically_increasing_id % split_count)
      val dfs = (0 until split_count).map(n => to_be_split.where('split === n).drop('split))
      
      dfs.foreach(_.show)
      +-----+
      |value|
      +-----+
      |    1|
      |    5|
      |    9|
      +-----+
      
      +-----+
      |value|
      +-----+
      |    2|
      |    6|
      |    0|
      +-----+
      
      +-----+
      |value|
      +-----+
      |    3|
      |    7|
      +-----+
      
      +-----+
      |value|
      +-----+
      |    4|
      |    8|
      +-----+
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2016-02-28
        • 2021-04-08
        • 2022-10-02
        • 2020-12-06
        • 2020-05-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多