【问题标题】:Same sink for spark structured streaming and batch?spark结构化流和批处理的相同接收器?
【发布时间】:2020-09-06 00:09:30
【问题描述】:

我有两个 spark 工作。一个是批处理作业,另一个是结构化流式作业。两者都写入同一个文件接收器。两者具有相同的架构。但是,在从该接收器读取数据时,Spark 仅读取流作业创建的文件并跳过批处理作业创建的文件。我可以在文件接收器文件夹中看到一个目录 _spark_metadata。当我删除此文件夹时,火花开始读取所有文件。但是,这并不总是可能的,因为在下一个微批量 spark 将在那里创建另一个 _spark_metadata 文件夹。如何在 spark 中读取此接收器中的所有文件。

【问题讨论】:

    标签: apache-spark hadoop pyspark apache-spark-sql spark-structured-streaming


    【解决方案1】:

    我遇到了相同和以下问题。我使用下面的代码来解决这些问题,它对我有用。可能下面的代码会对你有所帮助。

    Issue-1 : 如果您正在从流目录读取数据,Spark 将抛出以下异常。

    java.IO.FileNotFoundException ... The underlying files may have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
    

    Issue - 2: 如果 hdfs 目录为空,Spark 将抛出异常并尝试从该目录加载数据。我在加载数据时没有传递架构,如果您传递架构,您可能不会遇到这个问题。

    org.apache.spark.sql.AnalysisException: Unable to infer schema for JSON. It must be specified manually.
    
    

    而不是在加载数据时指向 HDFS 目录并获取所需的文件路径并将这些路径传递给 spark load 方法。

    在下面的代码中,您可以更好地控制要读取和忽略的文件。

    import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
    
    implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
        case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
          override def hasNext: Boolean = remoteIterator.hasNext
          override def next(): T = remoteIterator.next()
        }
        wrapper(remoteIterator)
    }
    
    def listFiles(spark: SparkSession,path: String) = {
        FileSystem.get(spark.sparkContext.hadoopConfiguration)    
        .listFiles(new Path(path),true)
        .toList.map(_.getPath)
        .filter(!_.toString.contains("_spark_metadata"))
        .map(_.toString)
    }
    
    val files = listFiles(spark,kafka.read.hdfsLocation)
    require(files.isEmpty, s"Files are not available to process data.")
    spark
        .read
        .format(read.format)
        .options(read.options)
        .load(files:_*)
    
    

    【讨论】:

    • 如果有数千个文件,与 spark 读取方式相比,这是否会使读取时的处理速度变慢?
    • Spark 内部也在做一些事情。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-20
    • 2019-01-14
    • 1970-01-01
    • 2019-08-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多