【问题标题】:PySpark Drop RowsPySpark 拖放行
【发布时间】:2014-09-03 07:36:03
【问题描述】:

如何从 PySpark 中的 RDD 中删除行?特别是第一行,因为它往往包含我的数据集中的列名。通过仔细阅读 API,我似乎找不到一种简单的方法来做到这一点。当然我可以通过 Bash / HDFS 做到这一点,但我只想知道这是否可以在 PySpark 中完成。

【问题讨论】:

  • 使用filter 过滤掉坏行
  • 如果你只想删除第一行怎么办?并且为了论证,我们不能使用行向量 x 中的任何信息,即我们不能做lambda x: (some condition using x)
  • 查看我的答案可能更接近您要找的内容

标签: python apache-spark pyspark


【解决方案1】:

假设您使用的是 Python 3,在 PySpark (Python API) 中实现此目的的简单方法:

noHeaderRDD = rawRDD.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys()

【讨论】:

  • python3 不允许元组解包。所以lambda tup: tup[1] >0更好。
  • 感谢@HSRAthore,刚刚更新了 sn-p 代码以避免与 Python 3 混淆
【解决方案2】:

我对各种解决方案进行了一些分析,并有以下内容

集群配置

集群

  • 集群 1:4 核 16 GB
  • 集群 2:4 核 16 GB
  • 集群 3:4 核 16 GB
  • 集群 4:2 核 8 GB

数据

700 万行,4 列

#Solution 1
# Time Taken : 40 ms
data=sc.TextFile('file1.txt')
firstRow=data.first()
data=data.filter(lambda row:row != firstRow)

#Solution 2
# Time Taken : 3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
     return iter(list(iterator)[1:]) if index==0 else iterator
data=data.mapPartitionsWithIndex(dropFirstRow)

#Solution 3
# Time Taken : 0.3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
     if(index==0):
          for subIndex,item in enumerate(iterator):
               if subIndex > 0:
                    yield item
     else:
          yield iterator

data=data.mapPartitionsWithIndex(dropFirstRow)

我认为解决方案 3 是最具可扩展性的

【讨论】:

    【解决方案3】:

    我用 spark2.1 测试过。假设您想在不知道文件列数的情况下删除前 14 行。

    sc = spark.sparkContext
    lines = sc.textFile("s3://location_of_csv")
    parts = lines.map(lambda l: l.split(","))
    parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
    

    withColumn 是一个 df 函数。因此,在上述情况下,以下将无法以 RDD 样式工作。

    parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
    

    【讨论】:

    • 部分和行都是 RDD 的 parts = parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0]),因为 parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0]) 不会就地进行更改。
    【解决方案4】:

    特定于 PySpark:

    根据@maasg,您可以这样做:

    header = rdd.first()
    rdd.filter(lambda line: line != header)
    

    但这在技术上并不正确,因为您可能会排除包含数据的行以及标题。但是,这似乎对我有用:

    def remove_header(itr_index, itr):
        return iter(list(itr)[1:]) if itr_index == 0 else itr
    rdd.mapPartitionsWithIndex(remove_header)
    

    同样:

    rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])
    

    我是 Spark 的新手,因此无法明智地评论哪个最快。

    【讨论】:

    • 你能解释一下那里发生了什么吗(我是一名 JS 开发人员)? return iter(list(itr)[1:] if itr_index == 0 else itr)? 1) - iter 采用 (object[, sentinel]) - 所以我猜 iter 采用 itr 可迭代对象(行)的列表,然后使用 Python 的 range 运算符,从第二个索引(基于 0)开始,然后迭代一直到itr_index == 0,否则,继续返回itr 行?我问是因为我使用的是相同的东西,但没有出现第一行字段,而是第一行数据变成了字段。
    • iter 可能会混淆这个问题。如果rdd.mapParitionsWithIndex 返回分区的索引,加上作为列表的分区数据,则它只是itr[1:] if itr_index == 0 else itr- 即如果它是第一个分区(即itr_index == 0)则排除第一行(即标题) ,并且它不是第一个分区(即没有标题),只需返回整个分区。 iterlist 是因为它实际上使用的是可迭代对象而不是列表。顺便说一句,我很确定有比iter(list(itr)[1:]) 更有效的路线。
    【解决方案5】:

    AFAIK 没有“简单”的方法可以做到这一点。

    不过,这应该可以解决问题:

    val header = data.first
    val rows = data.filter(line => line != header)
    

    【讨论】:

    • 这是合理的。谢谢!
    • 不应该是data.first吗? data.take(1) 将返回一个长度为 1 的 Array[T]。
    • 不过,只要您的列表中有重复项,此操作就会失败。
    • @SebastianHojas 只要您的数据中有标题。此外,如果您正在读取具有相同标题的多个文件,则此方法有效。基本上,它不会删除您的第一行,而是删除任何看起来像您的第一行的行。
    【解决方案6】:

    我个人认为仅使用过滤器来摆脱这些东西是最简单的方法。但是根据您的评论,我有另一种方法。使用 RDD 使每个分区都是一个数组(我假设每个分区有 1 个文件,并且每个文件的顶部都有违规行),然后跳过第一个元素(这是使用 scala api)。

    data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index

    请记住,RDD 的一大特点是它们是不可变的,因此自然而然地删除一行是一件棘手的事情

    更新: 更好的解决方案。
    rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/} )
    与 glom 相同,但没有将所有内容放入数组的开销,因为在这种情况下 x 是一个迭代器

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-12-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-03-29
      • 1970-01-01
      • 2014-12-01
      • 2012-04-26
      相关资源
      最近更新 更多