【问题标题】:Is it possible to remove files from Spark Streaming folder?是否可以从 Spark Streaming 文件夹中删除文件?
【发布时间】:2017-07-27 08:55:15
【问题描述】:

Spark 2.1,ETL 过程将文件从源系统转换为 parquet 并将小 parquet 放入文件夹 1。 folder1 上的 Spark 流工作正常,但 folder1 中的镶木地板文件对于 HDFS 来说太小了。我们必须将小的镶木地板文件合并到更大的文件中,但是当我尝试从文件夹 1 中删除文件时,火花流处理过程会出现异常:

17/07/26 17:16:23 错误 StreamExecution:查询 [id = f29783ea-bdfb-4b59-a6f6-b77f79509a5a,runId = cbcce2b2-7d7b-4e31-a15a-7efed420f974] 因错误而终止 java.io.FileNotFoundException: 文件不存在

是否可以在 spark 流文件夹中合并 parquet 文件?

    Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0.2.6.0.3-8
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.types._

val userSchema = new StructType()
  .add("itemId", "string")
  .add("tstamp", "integer")
  .add("rowtype", "string")
  .add("rowordernumber", "integer")
  .add("parentrowordernumber", "integer")
  .add("fieldname", "string")
  .add("valuestr", "string")

val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2")

csvDF.createOrReplaceTempView("tab1")
val aggDF = spark.sql("select distinct count(itemId) as cases_count from tab1")
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

aggDF
.writeStream
.queryName("aggregates")    // this query name will be the table name
.outputMode("complete")
  .format("memory")
  .start()
spark.sql("select * from aggregates").show()

// Exiting paste mode, now interpreting.

+-----------+
|cases_count|
+-----------+
+-----------+

import org.apache.spark.sql.types._
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(itemId,StringType,true), StructField(tstamp,IntegerType,true), StructField(rowtype,StringType,true), StructField(rowordernumber,IntegerType,true), StructField(parentrowordernumber,IntegerType,true), StructField(fieldname,StringType,true), StructField(valuestr,StringType,true))
csvDF: org.apache.spark.sql.DataFrame = [itemId: string, tstamp: int ... 5 more fields]
aggDF: org.apache.spark.sql.DataFrame = [cases_count: bigint]

scala> -------------------------------------------
Batch: 0
-------------------------------------------
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+-----------+
|cases_count|
+-----------+
|  292086106|
+-----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+
|cases_count|
+-----------+
|  292086106|
+-----------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------+
|cases_count|
+-----------+
|  292086106|
+-----------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----------+
|cases_count|
+-----------+
|  292086106|
+-----------+

-------------------------------------------
Batch: 4
-------------------------------------------
+-----------+
|cases_count|
+-----------+
|  324016758|
|  292086106|
+-----------+

-------------------------------------------
Batch: 5
-------------------------------------------
+-----------+
|cases_count|
+-----------+
|  355839229|
|  324016758|
|  292086106|
+-----------+

17/07/26 17:16:23 ERROR StreamExecution: Query [id = f29783ea-bdfb-4b59-a6f6-b77f79509a5a, runId = cbcce2b2-7d7b-4e31-a15a-7efed420f974] terminated with error
java.io.FileNotFoundException: File does not exist: /folder1/folder2/P-FMVDBAF-4021-20161107152556-1_006.gz.parquet

【问题讨论】:

  • 这是 Spark Streaming 还是 Structured Streaming?愿意分享一些代码吗?看起来像结构化流。您还可以包含整个堆栈跟踪吗?
  • 我已经用示例代码更新了主帖,是的,它是结构化流,我使用 spark-shell 来执行代码。

标签: spark-streaming parquet


【解决方案1】:

您可以使用 globbing 仅处理您需要的文件。像这样:

val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2/bigger_file*.parquet")

【讨论】: