我遇到了相同和以下问题。我使用下面的代码来解决这些问题,它对我有用。可能下面的代码会对你有所帮助。
Issue-1 : 如果您正在从流目录读取数据,Spark 将抛出以下异常。
java.IO.FileNotFoundException ... The underlying files may have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
Issue - 2: 如果 hdfs 目录为空,Spark 将抛出异常并尝试从该目录加载数据。我在加载数据时没有传递架构,如果您传递架构,您可能不会遇到这个问题。
org.apache.spark.sql.AnalysisException: Unable to infer schema for JSON. It must be specified manually.
而不是在加载数据时指向 HDFS 目录并获取所需的文件路径并将这些路径传递给 spark load 方法。
在下面的代码中,您可以更好地控制要读取和忽略的文件。
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
def listFiles(spark: SparkSession,path: String) = {
FileSystem.get(spark.sparkContext.hadoopConfiguration)
.listFiles(new Path(path),true)
.toList.map(_.getPath)
.filter(!_.toString.contains("_spark_metadata"))
.map(_.toString)
}
val files = listFiles(spark,kafka.read.hdfsLocation)
require(files.isEmpty, s"Files are not available to process data.")
spark
.read
.format(read.format)
.options(read.options)
.load(files:_*)