【问题标题】:Spark: adding column name to csv file failsSpark:将列名添加到 csv 文件失败
【发布时间】:2017-09-13 15:40:52
【问题描述】:

我有“a.txt”,它是 csv 格式并由制表符分隔:

16777216    16777471        -33.4940    143.2104
16777472    16778239    Fuzhou  26.0614 119.3061

然后我运行:

sc.textFile("path/to/a.txt").map(line => line.split("\t")).toDF("startIP", "endIP", "City", "Longitude", "Latitude")

然后我得到了:

java.lang.IllegalArgumentException:要求失败:数量 列不匹配。旧列名 (1): value 新列名 (5): startIP, endIP, City, Longitude, Latitude at scala.Predef$.require(Predef.scala:224) 在 org.apache.spark.sql.Dataset.toDF(Dataset.scala:376) 在 org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:40) ... 47 省略

如果我只是跑步:

res.map(line => line.split("\t")).take(2)

我明白了:

rdd: Array[Array[String]] = Array(Array(16777216, 16777471, "", -33.4940, 143.2104), Array(16777472, 16778239, Fuzhou, 26.0614, 119.3061))

这里有什么问题?

【问题讨论】:

    标签: scala csv apache-spark spark-dataframe


    【解决方案1】:

    正如@user7881163 所指出的,发生错误是因为您的split 生成了一个列,其值(因此Spark 给出的value 名称)是split 生成的标记数组。

    但是,对于来自 @zero323 的每个 cmets,只要确保您使用 collect @user7881163 的版本(采用部分函数的版本),如果您正在大规模操作,因为另一个更常用的 @987654328 @ 会将您的所有数据移动到驱动程序并压倒那台机器。如果您没有大规模运营,为什么还要使用 Spark?

    这是一种稍微不同的方法,也允许丢失城市数据:

    sc.textFile("path/to/a.txt")
      .map(_.split("\t"))
      .map {
          case Array(startIP, endIP, city, longitude, latitude) => (startIP, endIP, Some(city), longitude, latitude)
          case Array(startIP, endIP, longitude, latitude) => (startIP, endIP, None, longitude, latitude)
      }.toDF("startIP", "endIP", "City", "Longitude", "Latitude")
    

    【讨论】:

    • collect 转换不会将任何数据移动到驱动程序。使模式匹配详尽也是一个好主意。
    • 首先,如documentation 中所述,collect 是一个动作——不是转换,这很重要——这样做:“Return数据集的所有元素作为驱动程序中的数组。这通常在过滤器或其他返回足够小的数据子集的操作之后很有用。“如上所述,这在规模上会很糟糕。跨度>
    • 第二,是的,模式匹配应该是详尽的。我没有想到将问题的范围转移到“Scala 最佳实践”。也许我应该有。我只考虑了@derek 指示的数据格式。所以是的,@derek,如果您采用这种方法,请让您的模式匹配详尽或使用日志记录、Try 等正确管理异常。
    • 你说的不是同一个 collect :) github.com/apache/spark/blob/… 相当于 Scala 集合 API 中的 Seq.collect
    • 嗯,好的。我从来没有在任何地方看到过collect 的那个版本,但是是的,这确实让事情保持分布式。我已经进行了相应的编辑。当然,这整点与这个问题并没有真正的密切关系,它会更符合 Stack Overflow 指南,只需编辑答案以使其更好,而不是在一个不是主题的长评论线程中进行甚至对于正在摄取的数据的形状问题也很重要。
    【解决方案2】:

    试试:

    sc
      .textFile("path/to/a.txt")
      .map(line => line.split("\t"))
      .collect { case Array(startIP, endIP, City, Longitude, Latitude) => 
        (startIP, endIP, City, Longitude, Latitude) 
      }.toDF("startIP", "endIP", "City", "Longitude", "Latitude")
    

    或者直接使用csv 来源:

    spark.read.option("delimiter", "\t").csv("path/to/a.txt")
    

    您当前的代码创建了一个DataFrame,其中包含array<string> 类型的单列。这就是为什么当你传递 5 个名字时它会失败。

    【讨论】:

    • 这应该是{case Array(...) => ... } 而不是{case Seq(...) => ... }
    【解决方案3】:

    你可以试试这个例子:

    dataDF = sc.textFile("filepath").map(x=>x.split('\t').toDF();
    
    data = dataDF.selectExpr("_1 as startIP", "_2 as endIP", "_3 as City", "_4 as Longitude", "_5 as Latitude");
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-01-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-09-30
      • 2015-08-12
      • 2015-11-29
      相关资源
      最近更新 更多