【问题标题】:Spark File Streaming get File NamesSpark File Streaming 获取文件名
【发布时间】:2020-02-10 05:26:43
【问题描述】:

我需要知道从输入目录流式传输的输入文件的文件名。

下面是scala编程中的spark FileStreaming代码

object FileStreamExample {
  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder.master("local").getOrCreate()

    val input_dir = "src/main/resources/stream_input"
    val ck = "src/main/resources/chkpoint_dir"

    //create stream from folder
    val fileStreamDf = sparkSession.readStream.csv(input_dir)

    def fileNames() = fileStreamDf.inputFiles.foreach(println(_))

    println("Streaming Started...\n")
    //fileNames() //even here it is throwing the same exception
    val query = fileStreamDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("checkpointLocation", ck)
      .start()

    fileNames();

    query.awaitTermination()

  }}

但在流式传输时遇到以下异常

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/main/resources/stream_input]

【问题讨论】:

    标签: scala apache-spark spark-streaming filestream


    【解决方案1】:

    您可以使用org.apache.spark.sql.functions._ 中定义的input_file_name() 函数来获取将行导入数据框的文件名。

    sparkSession.readStream.csv(input_dir).withColumn("FileName", input_file_name())
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-10-24
      • 2016-05-05
      • 1970-01-01
      • 1970-01-01
      • 2019-10-18
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多