【问题标题】:Spark Streaming -> DStream.checkpoint versus SparkStreaming.checkpointSpark Streaming -> DStream.checkpoint vs Spark Streaming.checkpoint
【发布时间】:2016-10-09 20:07:35
【问题描述】:

我有 Spark 1.4 Streaming 应用程序,它从 Kafka 读取数据,使用有状态转换,批处理间隔为 15 秒。

为了使用全状态转换以及从驱动程序故障中恢复,我需要在流上下文中设置检查点。

此外,在 Spark 1.4 文档中,他们建议将 DStream 检查点设置为批处理间隔的 5-10 倍。

所以我的问题是:

如果我只在 spark 流上下文中设置检查点会发生什么?我猜 DStreams 会在每个批次间隔被检查点?

如果我在流上下文中设置检查点以及从 Kafka 读取数据的那一刻,我会设置:

DStream.checkpoint(90 秒)

元数据检查点和数据检查点(即 DStreams)的间隔是多少?

谢谢。

【问题讨论】:

    标签: apache-spark spark-streaming


    【解决方案1】:

    我猜 DStreams 会在每个批次间隔被检查点?

    不,Spark 会在每个批次间隔乘以一个常数时检查您的数据。这意味着如果您的批处理间隔为 15 秒,则每隔 15 秒的倍数将检查一次数据。以mapWithState为例,这是一个有状态的流,你可以看到批处理间隔乘以10:

    private[streaming] object InternalMapWithStateDStream {
      private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
    }
    

    元数据检查点的间隔是什么? 数据检查点(意思是 DStreams)?

    如果您在DStream 上将检查点持续时间设置为 90 秒,那么这将是您的检查点持续时间,这意味着每 90 秒数据将获得一次检查点。您不能直接在StreamingContext 上设置检查点持续时间,您只能通过检查点目录。 checkpoint 的重载只需要一个String

    /**
     * Set the context to periodically checkpoint the DStream operations for driver
     * fault-tolerance.
     * @param directory HDFS-compatible directory where the checkpoint
     *        data will be reliably stored.
     *        Note that this must be a fault-tolerant file system like HDFS.
     */
    def checkpoint(directory: String)
    

    编辑

    对于updateStateByKey,好像checkpoint的时间设置为批处理时间乘以Seconds(10) / slideDuration

    // Set the checkpoint interval to be slideDuration or 10 seconds,
    // which ever is larger
    if (mustCheckpoint && checkpointDuration == null) {
      checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
      logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
    }
    

    【讨论】:

    • 感谢您的回答。你知道 updateStateByKey 传输的检查点间隔是多少吗?
    • 好的,所以对于我的 15 秒批处理间隔,检查点间隔为:15*math.celi(10/15) = 15 秒。您认为这是最优的还是我应该考虑手动更改间隔(将其设置为更大)?
    • @SrdjanNikitovic 这真的取决于你的工作,你有多少传入数据,有多少工作节点处理,等等。
    • 非常感谢您的回答。
    • 抱歉我太天真了,但是如果检查点持续时间是批次持续时间的倍数,而不是每个批次。那么在持续时间之间出现的批次会发生什么?
    猜你喜欢
    • 2017-12-28
    • 1970-01-01
    • 2020-03-19
    • 2016-06-13
    • 2017-12-30
    • 2015-12-18
    • 2018-09-10
    • 1970-01-01
    • 2019-08-08
    相关资源
    最近更新 更多