【发布时间】:2019-01-07 02:20:04
【问题描述】:
我对使用 Spark 很陌生,但我一直被这个问题困扰:
来自我创建的 DataFrame;名为reportesBN,我想获取一个字段的值,以便用它来获取特定路由的TextFile。然后,给该文件一个特定的过程。
我已经开发了这段代码,但它不起作用:
reportesBN.foreach {
x =>
val file = x(0)
val insumo = sc.textFile(s"$file")
val firstRow = insumo.first.split("\\|", -1)
// Get values of next rows
val nextRows = insumo.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
val dfNextRows = nextRows.map(a => a.split("\\|")).map(x=> BalanzaNextRows(x(0), x(1),
x(2), x(3), x(4))).toDF()
val validacionBalanza = new RevisionCampos(sc)
validacionBalanza.validacionBalanza(firstRow, dfNextRows)
}
错误日志表明是因为序列化。
7/06/28 18:55:45 INFO SparkContext: Created broadcast 0 from textFile at ValidacionInsumos.scala:56
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
这个问题是由 foreach 内部的 Spark 上下文 (sc) 引起的吗?
还有其他方法可以实现吗?
问候。
【问题讨论】:
标签: scala apache-spark dataframe apache-spark-sql