【发布时间】:2020-05-30 00:57:43
【问题描述】:
我已经为我的结构化流应用程序启用了 WAL。我在哪里可以找到 WAL 日志的位置? 我可以在前缀 receivedBlockMetadata 中看到我的 Spark 流处理的 WAL。但是,我没有看到为结构化流创建任何前缀
【问题讨论】:
标签: apache-spark spark-streaming spark-structured-streaming
我已经为我的结构化流应用程序启用了 WAL。我在哪里可以找到 WAL 日志的位置? 我可以在前缀 receivedBlockMetadata 中看到我的 Spark 流处理的 WAL。但是,我没有看到为结构化流创建任何前缀
【问题讨论】:
标签: apache-spark spark-streaming spark-structured-streaming
据我了解,WAL 仅适用于 spark 流式传输,不适用于结构化流式传输。 结构化流实现基于检查点的容错,如 flink 全局状态。检查点存储所有状态,包括 kafka 偏移量等。位置在您的代码中指定。
【讨论】:
在 Spark Structure Streaming 中,现在 WAL 包含来自接收方的每条消息。
每个批次只有两个带有元数据的日志:偏移和提交日志。
您可以在org.apache.spark.sql.execution.streaming.StreamExecution 中找到实现的详细信息。 ->
/**
* A write-ahead-log that records the offsets that are present in each batch. In order to ensure
* that a given batch will always consist of the same data, we write to this log *before* any
* processing is done. Thus, the Nth record in this log indicated data that is currently being
* processed and the N-1th entry indicates which offsets have been durably committed to the sink.
*/
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
/**
* A log that records the batch ids that have completed. This is used to check if a batch was
* fully processed, and its output was committed to the sink, hence no need to process it again.
* This is used (for instance) during restart, to help identify which batch to run next.
*/
val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))
它们都在文件夹偏移和提交中的checkpointLocation 中可用。
在 Structure Streaming 日志中只包含偏移信息。
【讨论】: