【问题标题】:Apache Spark (Structured Streaming) : S3 Checkpoint supportApache Spark(结构化流):S3 检查点支持
【发布时间】:2017-06-19 18:53:14
【问题描述】:

来自 spark 结构化流式文档: “此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在启动查询时设置为 DataStreamWriter 中的选项。”

果然,将检查点设置为 s3 路径会抛出:

17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) 
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) 
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) 
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301) 
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430) 
        at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) 
        at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) 
        at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
        at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) 
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) 
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133) 
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala) 
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
        at java.lang.reflect.Method.invoke(Method.java:498) 
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) 
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook 

这里有几个问题:

  1. 为什么不支持 s3 作为检查点目录(常规 spark 流支持此)?是什么让文件系统“符合 HDFS”?
  2. 我临时使用 HDFS(因为集群可以随时启动或关闭)并使用 s3 作为保存所有数据的位置 - 在这种设置中存储结构化流数据的检查点数据的建议是什么?

【问题讨论】:

  • 这里纯属猜测,但您尝试过 s3n 或 s3a(最好是 s3a)协议吗?
  • 绝对值得尝试,会尝试的。

标签: apache-spark spark-structured-streaming


【解决方案1】:

是什么让 FS HDFS “兼容”?它是一个文件系统,具有Hadoop FS specification 中指定的行为。对象存储和 FS 之间的区别在此处进行了介绍,关键是“最终一致的对象存储没有附加或 O(1) 原子重命名不兼容”

特别是对于 S3

  1. 不一致:创建新的 blob 后,列表命令通常不会显示它。删除也一样。
  2. 当 blob 被覆盖或删除时,可能需要一段时间才能消失
  3. rename()是通过复制然后删除来实现的

通过将所有内容保存到某个位置然后将其重命名为检查点目录来激发流检查点。这使得检查点的时间与在 S3 中复制数据的时间成正比,约为 6-10 MB/s。

当前的流代码不适合 s3

现在,做一个

  • 检查点到 HDFS,然后复制结果
  • 检查点到分配并附加到您的集群的 EBS 位
  • 检查点到 S3,但检查点之间的间隔很长,因此检查点的时间不会导致您的流式应用程序停机。

如果您使用的是 EMR,则可以为一致的、由 dynamo DB 支持的 S3 支付额外费用,这可为您提供更好的一致性。但是复制时间还是一样的,所以检查点也会一样慢

【讨论】:

  • 我们在检查点到 S3 之间有 40 秒的间隔,但我们仍然偶尔会遇到检查点问题,例如临时目录被写入然后找不到。
  • 未找到检查点可能是 s3 的一致性表面:列表往往滞后于对象存储中的更改。通常你不会注意到,但有时它会浮出水面。将 dynamo 用于元数据存储应该可以工作......至少如果没有,它的实施是错误的
【解决方案2】:

这是一个已知问题:https://issues.apache.org/jira/browse/SPARK-19407

应该在下一个版本中修复。您可以使用--conf spark.hadoop.fs.defaultFS=s3 将默认文件系统设置为 s3 作为解决方法。

【讨论】:

  • 不要认为这个问题已经解决了。仍然无法在 S3 (spark 2.1.1) 上检查点结构化流。检查点恢复失败:7/06/29 00:29:00 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint org.apache.spark.sql.AnalysisException: 此查询不支持从检查点位置恢复。
  • 这是一个不同的问题。您使用的是不支持恢复的“内存”或“控制台”吗?
  • 我尝试在客户端模式下在 yarn 中使用 spark。并且有同样的问题
【解决方案3】:

此问题已在https://issues.apache.org/jira/browse/SPARK-19407 中修复。

但是,由于 S3 中缺乏最终一致性,结构化流检查点在 S3 中效果不佳。使用 S3 检查点 https://issues.apache.org/jira/browse/SPARK-19013 不是一个好主意。

Micheal Armburst 表示这不会在 Spark 中修复,解决方案是等待 S3guard 实施。 S3Guard 有时会消失。

【讨论】:

  • 我能问一下这是否改变了吗?
【解决方案4】:

是的,如果您使用的是 Spark Structured Streaming 版本 3 或更高版本。首先,创建一个 SparkSession 并将 S3 配置添加到其上下文中。

val sparkSession = SparkSession
    .builder()
    .master(sparkMasterUrl)
    .appName(appName)
    .getOrCreate()

sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "accessKey")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "secretKey")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://s3URL:s3Port")
sparkSession.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

稍后,在您开始查询之前,将 checkpointLocation 配置与 S3 存储桶一起添加。例如:

val streamingQuery = streamingDF.writeStream
    .option("checkpointLocation", "s3a://bucketName/checkpointDir/")
    .foreachBatch{(batchDF: DataFrame, batchId: Long) =>
       // Transform and write batchDF
     }.start()

streamingQuery.awaitTermination()

【讨论】:

    【解决方案5】:

    您可以使用 s3 作为检查点,但您应该启用 EMRFS,以便处理 s3 一致性。

    【讨论】:

      猜你喜欢
      • 2018-06-22
      • 1970-01-01
      • 1970-01-01
      • 2023-03-10
      • 2021-06-02
      • 2021-12-03
      • 1970-01-01
      • 2018-03-18
      • 1970-01-01
      相关资源
      最近更新 更多