【问题标题】:Nested data from an Rdd Scala spark来自 Rdd Scala 火花的嵌套数据
【发布时间】:2016-12-09 21:01:09
【问题描述】:

我的示例数据如下所示

{ Line 1
Line 2
Line 3
Line 4
...
...
...
Line 6



Complete info:
Dept : HR
Emp name is Andrew lives in Colorodo
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Alex lives in Texas
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Mathew lives in California
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016

Dept : QC
Emp name is Nguyen lives in Nevada
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Cassey lives in Newyork
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Ronney lives in Alasca
DOB : 03/09/1958
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016


line21
line22
line23
...
}

我需要的输出;

{

Dept    Empname     State       Dob     Projectname         DOJ     DOE
HR  Andrew      Colorodo    03/09/1958  Healthcare      06/04/2011  09/21/2011
HR  Andrew      Colorodo    03/09/1958  Retail          11/04/2011  08/21/2013
HR  Andrew      Colorodo    03/09/1958  Audit           09/11/2013  09/01/2014
HR  Andrew      Colorodo    03/09/1958  ControlManagement   06/04/2011  09/21/2011
HR  Alex        Texas       03/09/1958  Healthcare      06/04/2011  09/21/2011
HR  Alex        Texas       03/09/1958  ControlManagement   06/04/2011  09/21/2011
HR  Mathews     California  03/09/1958  Healthcare      06/04/2011  09/21/2011
HR  Mathews     California  03/09/1958  Retail          11/04/2011  08/21/2013
HR  Mathews     California  03/09/1958  Audit           09/11/2013  09/01/2014
HR  Mathews     California  03/09/1958  ControlManagement   06/04/2011  09/21/2011
QC  Nguyen      Nevada      03/09/1958  Healthcare      06/04/2011  09/21/2011
QC  Nguyen      Nevada      03/09/1958  Retail          11/04/2011  08/21/2013
QC  Nguyen      Nevada      03/09/1958  Audit           09/11/2013  09/01/2014
QC  Nguyen      Nevada      03/09/1958  ControlManagement   06/04/2011  09/21/2011
QC  Casey       Newyork     03/09/1958  Healthcare      06/04/2011  09/21/2011
QC  Casey       Newyork     03/09/1958  Retail          11/04/2011  08/21/2013
QC  Casey       Newyork     03/09/1958  Audit           09/11/2013  09/01/2014
QC  Casey       Newyork     03/09/1958  ControlManagement   06/04/2011  09/21/2011}

我尝试了以下选项: 1)考虑在地图内使用地图然后进行匹配。出了这么多错误。然后从这里阅读一篇文章,它解释了我的地图里面不能有另一张地图。事实上,不能在另一个内部进行任何 Rdd 转换。对不起。 Spark 的新手。

2) 尝试使用 reg 表达式。然后在捕获的组上调用地图。但由于每个部门都有多个员工,每个员工都有多个项目信息,我不能重复分组这部分数据,也无法与相应的员工进行映射。员工和部门详细信息也是如此。

Q1 : 是否可以在 Spark/Scala 中将上述示例数据转换为上述数据格式?

Q2:如果是这样,我应该追求的逻辑/概念是什么?

提前致谢。

【问题讨论】:

  • 这不是 Spark 的完美匹配。任何线性传递通常都不是最好在 Spark 中完成的。不过,在普通 Scala 中执行此操作非常简单 - 只需以这种方式预处理文件并将结果放入 Spark 中以供以后处理?
  • 数据有多大?你真的需要 Spark 吗?
  • 数据约为 75GB。如果有任何解决方案/逻辑,可在 spark 中使用(即使其复杂/冗长。低效的代码),我想在与其他人一起尝试之前尝试一下。有任何想法吗?谢谢。

标签: scala apache-spark nested-loops


【解决方案1】:

Q1:这种嵌套的数据格式可以用Spark转换吗?

答:是的。如果记录更细化,我建议使用多行方法,如本问题中讨论的:How to process multi line input records in Spark

但是,鉴于“部门”的数据中包含大量数据,我不建议这样做。

Q2:我应该遵循什么逻辑/概念?

A2:这种线性处理,在我们遍历线路时会建立一个状态,最好使用迭代器或基于流的实现:

我们每行消耗一行,并且仅在记录完成时才生成记录。上下文在某种状态下被保留。 使用这种方法,文件有多大实际上并不重要,因为内存需求仅限于一条记录的大小 + 状态处理的开销。

这是一个工作示例,说明如何使用使用普通 Scala 的迭代器来处理它:

case class EmployeeRecord(dept: String, name: String, location: String, dob: String, project: String, joined: String, left: String) {
  def toCSV = this.productIterator.mkString(", ")
}


class EmployeeParser() {

  var currentStack : Map[String, String] = Map()

  val (dept, name, location, birthdate, project, joined, left) = ("dept", "name", "location", "birthdate", "project", "joined", "left")
  val keySequence = Seq(dept, name, location, birthdate, project, joined, left)
  val ParseKeys = Map("Project name" -> project, "DOJ" -> joined, "DOL" -> left, "DOB" -> birthdate, "Dept" -> dept)
  val keySet = Set(keySequence)

  def clearDependencies(key: String) : Unit = {
    val keepKeys = keySequence.dropWhile(k => k != key).toSet
    currentStack = currentStack.filterKeys(k => !keepKeys.contains(k))
  }

  def isValidEntry(key: String) : Boolean = {
    val precedents = keySequence.takeWhile(k => k != key).drop(1)
    precedents.forall(k => currentStack.contains(k))
  }

  def add(key:String, value:String): Option[Unit] = {
    if (!isValidEntry(key)) None else {
      clearDependencies(key)
      currentStack = currentStack + (key -> value)
      Some(())
    }
  } 

  def record: Option[EmployeeRecord] = 
    for {
      _dept <- currentStack.get(dept)
      _name <- currentStack.get(name)
      _location <- currentStack.get(location)
      _dob <- currentStack.get(birthdate)
      _project <- currentStack.get(project)
      _joined <- currentStack.get(joined)
      _left <- currentStack.get(left)
    } yield EmployeeRecord(_dept, _name, _location, _dob, _project,_joined, _left)

  val EmpRegex = "^Emp name is (.*) lives in (.*)$".r
  def parse(line:String):Option[EmployeeRecord] = {
    if (line.startsWith("Emp")) { // have to deal with that inconsistency in a different way than using keys
      val maybeEmp = Option(line).map{case EmpRegex(n,l) => (n,l)}
                                 .foreach{case (n,l) => add(name, n) ; add(location, l)}
      None
    } else {
      val entry = line.split(":").map(_.trim)
      for { entryKey <- entry.lift(0)
            entryValue <- entry.lift(1)
            key <- ParseKeys.get(entryKey)
            _ <- add(key, entryValue)
            rec <- record
          } yield rec
    }
  }
}

为了使用它,我们实例化解析器并将其应用于迭代器:

val iterator = Source.fromFile(...).getLines
val parser = new EmployeeParser()
val parsedRecords = iterator.map(parser.parse).collect{case Some(record) => record}
val parsedCSV = parsedRecords.map(rec => rec.toCSV)
parsedCSV.foreach(line => // write to destination file)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-07-21
    • 1970-01-01
    • 2023-04-03
    • 2014-07-12
    • 2017-02-28
    • 1970-01-01
    • 2015-09-13
    • 2016-05-01
    相关资源
    最近更新 更多