【发布时间】:2021-10-18 14:28:30
【问题描述】:
我正在尝试处理 4 个每天都在增长的文本文件目录。我需要做的是,如果有人试图搜索发票号码,我应该给他们一个包含它的文件列表。
我能够通过将文本文件中的值加载为 RDD 来映射和减少它们。但是如何获取文件名和其他文件属性呢?
【问题讨论】:
标签: apache-spark
我正在尝试处理 4 个每天都在增长的文本文件目录。我需要做的是,如果有人试图搜索发票号码,我应该给他们一个包含它的文件列表。
我能够通过将文本文件中的值加载为 RDD 来映射和减少它们。但是如何获取文件名和其他文件属性呢?
【问题讨论】:
标签: apache-spark
从 Spark 1.6 开始,您可以将text 数据源和input_file_name 函数组合如下:
斯卡拉:
import org.apache.spark.sql.functions.input_file_name
val inputPath: String = ???
spark.read.text(inputPath)
.select(input_file_name, $"value")
.as[(String, String)] // Optionally convert to Dataset
.rdd // or RDD
Python:
(2.x 之前的版本有问题,转换为 RDD 时可能不会保留名称):
from pyspark.sql.functions import input_file_name
(spark.read.text(input_path)
.select(input_file_name(), "value"))
.rdd)
这也可以与其他输入格式一起使用。
【讨论】:
val content = sqlContext.read.text(inputPath).withColumn("filename", input_file_name)
spark.readStream .option("sep", ",") .schema(someSchema) .option("header", "true") .csv("hdfs://path/") .withColumn("input_file_name", input_file_name)
如果你在 pyspark 中可以试试这个:
test = sc.wholeTextFiles("pathtofile")
您将得到一个结果 RDD,其中第一个元素 = 文件路径,第二个元素 = 内容
【讨论】:
如果您的文本文件足够小,您可以使用SparkContext.wholeTextFiles,它返回的RDD 为(filename,content)。
【讨论】:
如果您的文本文件对于SparkContext.wholeTextFiles 来说太大,您可以使用(简单)自定义InputFormat,然后调用SparkContext.hadoopRDD
InputFormat 需要返回一个元组(文件名,行)而不是行,然后您可以使用查看行内容的谓词进行过滤,然后将其唯一并收集文件名。
在 Spark 中,代码如下所示:
val ft = classOf[FileNamerInputFormat]
val kt = classOf[String]
val vt = classOf[String]
val hadoopConfig = new Configuration(sc.hadoopConfiguration)
sc.newAPIHadoopFile(path, ft, kt, vt, hadoopConfig)
.filter { case (f, l) => isInteresting(l) }
.map { case (f, _) => f }
.distinct()
.collect()
【讨论】:
您可以使用WholeTextFile() 来实现此目的。但是,如果输入文件很大,那么使用 WholeTextFile() 会适得其反,因为它将整个文件内容放入单个记录中。
在这种情况下检索文件名的最佳方法是使用mapPartitionsWithInputSplit()。您可以在 my blog 上找到使用此方案的工作示例。
【讨论】:
如果您使用数据帧 API,您可以使用来自 org.apache.spark.sql.functions 的 input_file_name 函数从 HDFS 获取文件名。下面的 sn-ps 可能会帮助你理解。
val df = spark.read.csv("/files/")
val df2 = df.withColumn("file_name", split(input_file_name(), "/").getItem(7).cast(StringType))
val df3 = df.withColumn("file_name", input_file_name())
df2 现在包含名为“file_name”的新字段,其中包含使用split 函数提取的 HDFS 文件名。如果您需要完整的 HDFS 路径,您可以使用 input_file_name() 函数,仅在 df3 中显示。
【讨论】:
直接使用 Spark 似乎有点矫枉过正……如果这些数据要“收集”到驱动程序,为什么不使用 HDFS API? Hadoop 通常与 Spark 捆绑在一起。这是一个例子:
import org.apache.hadoop.fs._
import org.apache.hadoop.conf._
val fileSpec = "/data/Invoices/20171123/21"
val conf = new Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(new URI("hdfs://nameNodeEneteredHere"),conf)
val path = new Path(fileSpec)
// if(fs.exists(path) && fs.isDirectory(path) == true) ...
val fileList = fs.listStatus(path)
那么用println(fileList(0)),info(格式化)like this first item(作为例子)可以看成org.apache.hadoop.fs.FileStatus:
FileStatus {
path=hdfs://nameNodeEneteredHere/Invoices-0001.avro;
isDirectory=false;
length=29665563;
replication=3;
blocksize=134217728;
modification_time=1511810355666;
access_time=1511838291440;
owner=codeaperature;
group=supergroup;
permission=rw-r--r--;
isSymlink=false
}
fileList(0).getPath 将提供hdfs://nameNodeEneteredHere/Invoices-0001.avro。
我猜这种读取文件的方法主要是使用 HDFS 名称节点,而不是在每个执行程序中。 TLDR;我打赌 Spark 可能会轮询名称节点以获取 RDD。如果底层的 Spark 调用轮询 namenode 来管理 RDD,也许上面是一个有效的解决方案。尽管如此,提出任一方向的贡献的 cmets 都会受到欢迎。
【讨论】: