【问题标题】:How to read file names from column in DataFrame to process using SparkContext.textFile?如何从 DataFrame 中的列中读取文件名以使用 SparkContext.textFile 进行处理?
【发布时间】:2019-01-07 02:20:04
【问题描述】:

我对使用 Spark 很陌生,但我一直被这个问题困扰:

来自我创建的 DataFrame;名为reportesBN,我想获取一个字段的值,以便用它来获取特定路由的TextFile。然后,给该文件一个特定的过程。

我已经开发了这段代码,但它不起作用:

reportesBN.foreach { 
      x => 
        val file = x(0)
        val insumo = sc.textFile(s"$file")

        val firstRow = insumo.first.split("\\|", -1)

        // Get values of next rows
        val nextRows = insumo.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }

        val dfNextRows = nextRows.map(a => a.split("\\|")).map(x=> BalanzaNextRows(x(0), x(1),
          x(2), x(3), x(4))).toDF() 

        val validacionBalanza = new RevisionCampos(sc)
        validacionBalanza.validacionBalanza(firstRow, dfNextRows)
}

错误日志表明是因为序列化。

7/06/28 18:55:45 INFO SparkContext: Created broadcast 0 from textFile at ValidacionInsumos.scala:56
Exception in thread "main" org.apache.spark.SparkException: Task not serializable

这个问题是由 foreach 内部的 Spark 上下文 (sc) 引起的吗?

还有其他方法可以实现吗?

问候。

【问题讨论】:

    标签: scala apache-spark dataframe apache-spark-sql


    【解决方案1】:

    A very similar question you asked before 也是同样的问题 - 您不能在 RDD 转换或操作中使用 SparkContext。在这种情况下,您在reportesBN.foreach 中使用sc.textFile(s"$file"),正如您所说的那样是DataFrame

    来自我创建的 DataFrame;叫reportesBN

    您应该重写您的转换以从 DataFrame 中获取一个文件,然后再读取它。

    // This is val file = x(0)
    // I assume that the column name is `files`
    val files = reportesBN.select("files").as[String].collectAsList
    

    一旦您有了要处理的文件集合,您就可以执行代码块中的代码。

    files.foreach { 
          x => ...
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-07-29
      • 1970-01-01
      • 1970-01-01
      • 2018-07-07
      • 2022-01-24
      相关资源
      最近更新 更多