【问题标题】:Spark Structure Streaming fail duo to checkpoint file not found由于未找到检查点文件,Spark Structured Streaming 失败
【发布时间】:2018-07-12 14:09:59
【问题描述】:

我正在测试环境中运行 spark 结构化流。有时会因为找不到某个检查点文件而导致作业失败。

一个原因可能是 kafka 主题的保留时间很短。但我已将.option("failOnDataLoss", "false") 添加到 SparkSession。

我对火花检查点有一些基本(非常基本)的了解。我想如果我删除了检查点目录,它应该可以恢复。但是正如我测试的那样,一旦发生此错误,删除该目录就无济于事。我需要使用不同的检查点目录来修复它。

为什么删除检查点目录不起作用?或者有没有办法/选项可以帮助避免这个错误?

     diagnostics: User class threw exception: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 14, master-testspark.runspark.com, executor 2): java.lang.IllegalStateException: Error reading delta file consumer-cp3/state/0/4/1.delta of HDFSStateStoreProvider[id = (op=0, part=4), dir = consumer-cp3/state/0/4]: consumer-cp3/state/0/4/1.delta does not exist
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:265)
    at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:200)
    at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2025)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1996)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

    at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1225)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
    at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:274)
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:266)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1538)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:332)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:327)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:340)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:786)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:407)
    ... 27 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2025)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1996)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

    at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
    at org.apache.hadoop.ipc.Client.call(Client.java:1498)
    at org.apache.hadoop.ipc.Client.call(Client.java:1398)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
    at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:272)
    at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185)
    at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1238)
    ... 39 more

【问题讨论】:

  • 您找到问题的原因了吗?我一直面临着完全相同的问题...
  • 会在上面发表评论
  • 我也面临同样的问题。有什么解决方案吗?
  • 这里有同样的问题。看起来 Spark 正在尝试从上一次运行中查找恢复中间状态,但该状态文件夹下没有数据。
  • 我想你会发现这个相关的stackoverflow.com/questions/42006664/…

标签: apache-spark spark-structured-streaming spark-checkpoint


【解决方案1】:

Checkpointing 是一个截断RDD lineage graph 并将其保存到可靠的分布式(HDFS)或本地文件系统的过程。

来自 Apache Spark 文档n 两种类型的检查点。

  • 元数据检查点

    将定义流计算的信息保存到fault-tolerant 存储,如HDFS。这用于从运行streaming application 驱动程序的节点故障中恢复。

  • 数据检查点

    这在某些stateful transformations 中是必要的,它们跨多个批次组合数据。在这样的转换中,生成的RDDs 依赖于先前批次的RDDs,这导致依赖链的长度随着时间的推移而不断增加。为了避免这种无限制的恢复时间增加(与依赖链成正比),中间的RDDsstateful transformations 定期checkpointed 到可靠存储(例如HDFS)以切断依赖链。

如果您想查看它的内部运作方式,请查看 Tathagata 的 Spark 博客 Fault-tolerance using checkpointing

Spark 使用 Checkpoint 从失败中恢复。不要重新创建已删除的目录。这就是您收到此异常的原因。

java.io.FileNotFoundException: File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta

【讨论】:

  • 我的应用是 Spark Structured Streaming。也许它与 Spark Streaming 关于检查点处理的不同。根据帖子,除非出现故障并且 spark 尝试恢复,否则不会读取检查点文件。但我没有注意到失败。唯一的失败是 FileNotFound 异常。那么在什么情况下 spark 会尝试读取检查点文件而没有任何先前的失败呢?
  • @DeepNightTwo 你找到解决这个问题的办法了吗?
  • 我的也是结构化流媒体,这里也是同样的问题。
猜你喜欢
  • 2021-06-03
  • 1970-01-01
  • 1970-01-01
  • 2022-01-14
  • 2020-09-08
  • 2020-10-29
  • 2019-02-28
  • 1970-01-01
  • 2020-03-19
相关资源
最近更新 更多