【问题标题】:How to get files name with spark sc.textFile?如何使用 spark sc.textFile 获取文件名?
【发布时间】:2021-09-14 02:17:40
【问题描述】:

我正在使用以下代码读取文件目录:

val data = sc.textFile("/mySource/dir1/*")

现在我的data rdd 包含目录中所有文件的所有行(对吗?)

我现在想为每一行添加一个带有源文件名的列,我该怎么做?

我尝试过的其他选项是使用 WholeTextFile,但我不断出现内存不足异常。 5 台服务器 24 核 24 GB(executor-core 5 executor-memory 5G) 有什么想法吗?

【问题讨论】:

  • 如果您使用上面的代码 sn-p,我认为没有办法获取文件名。但是,您只能通过 sc.wholeTextFiles("/path/to/dir").keys 获取文件名。但我不认为您的错误是由使用 wholeTextFile 与 textFile 引起的 - 它是由您之后对数据所做的事情引起的。你应该发布你的其他代码。
  • 我没有其他代码,只有 wholeTextFile 和 count()
  • 1.目录中有多少文件; 2. 你有没有先在本地机器上尝试你的代码 3. 你如何运行 spark
  • 目录中有大约1222个文件
  • 本地什么意思,文件在hdfs上

标签: scala apache-spark


【解决方案1】:

您可以使用此代码。我已经用 Spark 1.4 和 1.5 对其进行了测试。

它从inputSplit 获取文件名,并使用iterator 使用NewHadoopRDDmapPartitionsWithInputSplit 将其添加到每一行

import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.{NewHadoopRDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

val sc = new SparkContext(new SparkConf().setMaster("local"))

val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]

val path :String = "file:///home/user/test"
val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration)

val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
           .mapPartitionsWithInputSplit((inputSplit, iterator) => {
  val file = inputSplit.asInstanceOf[FileSplit]
  iterator.map(tup => (file.getPath, tup._2))
  }
)

linesWithFileNames.foreach(println)

【讨论】:

    【解决方案2】:

    我认为现在回答这个问题已经很晚了,但我找到了一种简单的方法来做你正在寻找的事情:

    步骤 0:从 pyspark.sql 导入函数为 F 第 1 步:像往常一样使用 RDD 创建数据帧。比如说df 第 2 步:使用 input_file_name() df.withColumn("INPUT_FILE", F.input_file_name())

    这将在您的 Dataframe 中添加一个带有源文件名的列。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-08-31
      • 2023-03-19
      • 2015-06-13
      • 2023-04-05
      • 2020-02-10
      • 2016-02-24
      相关资源
      最近更新 更多