【问题标题】:De-normalizing data in spark scalaspark scala中的非规范化数据
【发布时间】:2017-11-05 19:50:18
【问题描述】:

我有以下从 csv 读取的架构:

val PersonSchema = StructType(Array(StructField("PersonID",StringType,true), StructField("Name",StringType,true)))
val AddressSchema = StructType(Array(StructField("PersonID",StringType,true), StructField("StreetNumber",StringType,true), StructField("StreetName",StringType,true)))

一个人可以有多个地址,并通过 PersonID 关联。

有人可以按照以下案例类定义将记录转换为 PersonAddress 记录吗?

case class Address(StreetNumber:String, StreetName:String)
case class PersonAddress(PersonID:String, Name:String, Addresses:Array[Address])

我尝试了以下方法,但在最后一步出现异常:

val results = personData.join(addressData, Seq("PersonID"), "left_outer").groupBy("PersonID","Name").agg(collect_list(struct("StreetNumber","StreetName")) as "Addresses")
val personAddresses = results .map(data => PersonAddress(data.getAs("PersonID"),data.getAs("Name"),data.getAs("Addresses")))
personAddresses.show

给出错误:

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef 不能转换为 $line26.$read$$iw$$iw$Address

【问题讨论】:

  • data.getAs("Addresses") 的类型是什么?它不是结构列表吗?你也应该map 处理它,我认为理解起来会很漂亮。
  • 我该怎么做?当我尝试以下操作时,它抱怨“值映射不是 Nothing” val personAddresses = results .map(data => PersonAddress(data.getAs("PersonID"),data.getAs("Name"),data. getAs("Addresses").map(df => Address(df.getAs("StreetNumber"), df.getAs("StreetName"))))
  • data.getAs[Addresses]("Addresses")?
  • 你的意思是这样? val personAddresses = results .map(data => PersonAddress(data.getAs("PersonID"),data.getAs("Name"),data‌​.getAs[Array[Address]]("Addresses"))) 我得到以下错误:java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef 无法转换为 [L$line69.$read$$iw$$iw$Address;
  • @SYL 答案是否帮助您解决了问题?如果没有,发生了什么问题/错误?

标签: scala apache-spark


【解决方案1】:

在这种情况下,最简单的解决方案是使用UDF。首先,将街道号码和名称收集为两个单独的列表,然后使用UDF 将所有内容转换为PersonAddress 的数据框。

val convertToCase = udf((id: String, name: String, streetName: Seq[String], streetNumber: Seq[String]) => {
  val addresses = streetNumber.zip(streetName) 
  PersonAddress(id, name, addresses.map(t => Address(t._1, t._2)).toArray)
})

val results = personData.join(addressData, Seq("PersonID"), "left_outer")
  .groupBy("PersonID","Name")
  .agg(collect_list($"StreetNumber").as("StreetNumbers"), 
       collect_list($"StreetName").as("StreetNames"))
val personAddresses = results.select(convertToCase($"PersonID", $"Name", $"StreetNumbers", $"StreetNames").as("Person"))

这将为您提供如下架构。

root
 |-- Person: struct (nullable = true)
 |    |-- PersonID: string (nullable = true)
 |    |-- Name: string (nullable = true)
 |    |-- Addresses: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- StreetNumber: string (nullable = true)
 |    |    |    |-- StreetName: string (nullable = true)

【讨论】:

  • 如何利用 Spark 的分布式计算特性?或者我们如何优化它以利用它在 Spark 中进行分布式并行处理?
  • @SYL Spark 会自动为您解决这个问题,只要您在集群上而不是本地运行它。
猜你喜欢
  • 2013-12-11
  • 2017-01-14
  • 2016-05-27
  • 2016-07-23
  • 1970-01-01
  • 2012-11-18
  • 2010-10-06
  • 1970-01-01
  • 2013-01-18
相关资源
最近更新 更多