【问题标题】:Delta Lake: File Not Found ExceptionDelta Lake:找不到文件异常
【发布时间】:2020-05-18 06:38:59
【问题描述】:

我正在使用 Delta Lake 执行合并操作,为此我尝试将 Parquet 文件转换为随时间分区的 delta 格式:

val source = spark.read.parquet("s3a://data-lake/source/")
       source
      .write
      .option("maxRecordsPerFile",20000)
      .mode("overwrite")
      .partitionBy("time")
      //.option("fs.s3a.committer.name", "partitioned") (I even tried using s3a committers)
      .format("delta")
      .save("s3a://data-lake/target/")

数据超过 250G,我的 spark 配置是:

spark.cores.max 420
spark.default.parallelism   10000
spark.delta.logStore.class  org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
spark.driver.extraJavaOptions   -Xms20g
spark.driver.memory 28g
spark.executor.cores    2
spark.executor.memory 28G

在日志中显示 File Not Found Error 并在运行一段时间后最终杀死执行程序:

20/05/16 11:51:02 WARN TaskSetManager: Lost task 208.0 in stage 2.0 (TID 3294, 172.16.145.25, executor 14): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://data-lake/target/time=20190101/part-00208-2c9b5ddd-f2c1-4b8c-9d77-eacb0055ff82.c000.snappy.parquet

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    ... 53 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我尝试将其保存为镶木地板格式,它确实按预期工作。但处理时间要长得多。

【问题讨论】:

  • 尽量不要设置 spark.delta.logStor 和每个文件的最大记录数
  • 我遇到了同样的问题。我认为这与设置无关。这可能与这里的原因相同 - github.com/delta-io/delta/issues/341.

标签: scala apache-spark delta-lake


【解决方案1】:

如果已经在parquet中,可以不使用spark将s3中的文件夹复制到目标位置,初始化为delta表,如下行:

import io.delta.DeltaTable
DeltaTable.forPath("s3a://data-lake/target/"))

更多信息here

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-10
    • 2021-01-27
    • 2022-10-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多