【问题标题】:Spark structured steaming from kafka - last message processed again after resume from checkpoint来自kafka的Spark结构化流式传输-从检查点恢复后再次处理的最后一条消息
【发布时间】:2017-05-09 02:05:45
【问题描述】:

我正在使用 Spark 2.0.2 的全新(并标记为“alpha”)结构化流来读取来自 kafka 主题的消息并从中更新几个 cassandra 表:

val readStream = sparkSession.readStream
  .format("kafka")
  .option("subscribe", "maxwell")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .load
  .as[KafkaMessage]
  .map(<transform KafkaMessage to Company>)

val writeStream = readStream
  .writeStream
  .queryName("CompanyUpdatesInCassandra")
  .foreach(new ForeachWriter[Company] {
    def open(partitionId: Long, version: Long): Boolean = {
      true
    }

    def process(company: Company): Unit = {
      ...
    }

    def close(errorOrNull: Throwable): Unit = {}
  }
  .start
  .awaitTermination

我还在 sparkSession 上配置了一个检查点位置(“spark.sql.streaming.checkpointLocation”)。这样一来,我就可以在流式传输应用程序关闭时立即接收到达的消息。

但是,自从配置了这个检查点位置后,我注意到在恢复时它也始终如一地处理前一批的最后一条消息,即使它已经正确处理而没有失败。

知道我在这里做错了什么吗?这似乎是一个非常常见的用例。

更多信息:

在此处查看相关日志(主题 5876 是上一批成功处理的最后一个主题):

[INFO] 12:44:02.294 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming streaming query, starting with batch 31
[DEBUG] 12:44:02.297 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Found possibly uncommitted offsets {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[DEBUG] 12:44:02.300 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming with committed offsets: {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]}
[DEBUG] 12:44:02.301 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Stream running from {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} to {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]}
[INFO] 12:44:02.310 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch called with start = Some([(maxwell-0,5876)]), end = [(maxwell-0,5877)]
[INFO] 12:44:02.311 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Partitions added: Map()
[DEBUG] 12:44:02.313 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: TopicPartitions: maxwell-0
[DEBUG] 12:44:02.318 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Sorted executors: 
[INFO] 12:44:02.415 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[DEBUG] 12:44:02.467 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Retrieving data from KafkaSource[Subscribe[maxwell]]: Some([(maxwell-0,5876)]) -> [(maxwell-0,5877)]
[DEBUG] 12:44:09.242 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Creating iterator for KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None)
[INFO] 12:44:09.879 [Executor task launch worker-0] biz.meetmatch.streaming.CompanyUpdateListener$$anon$1: open (partitionId:0, version:31)
[DEBUG] 12:44:09.880 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Get spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 nextOffset -2 requested 5876
[INFO] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Initial fetch for maxwell-0 5876
[DEBUG] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Seeking to spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 5876
[DEBUG] 12:44:10.049 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Polled spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor [maxwell-0]  1

另外,当我终止流时,我确保它被优雅地停止以避免数据丢失:

sys.ShutdownHookThread
{
  writeStream.stop
  sparkSession.stop
}

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-structured-streaming


    【解决方案1】:

    目前,结构化流式处理会在生成新偏移量时检查状态。因此,您描述的情况是预期的,最后提交的批次可能会在恢复后重新处理。但是,这是一个内部实现。假设我们在提交批处理时做检查点,仍然有可能检查点失败,您的接收器 ForeachWriter 也需要处理这种情况。

    通常,您的接收器应该始终是幂等的。

    更新:在 Spark 2.2.0 中,Structured Streaming 在恢复成功后不会重新运行批处理。

    【讨论】:

    • 我明白了。我的印象是,只有在提交批次和执行检查点之间出现实际问题后,才会重新处理最后一批。但实际上这没什么大不了的,因为 ForeachWriter 无论如何都必须是幂等的。谢谢!
    • 现在它实际上只是内部的简化(我们通过开始下一个批次来标记一个批次完成)。我认为我们将来可能会对此进行优化。正如你所说,如果你关心一次性语义,你仍然应该让你的 Writer 幂等。
    猜你喜欢
    • 2019-09-13
    • 1970-01-01
    • 2019-10-03
    • 2019-07-29
    • 2018-06-22
    • 2018-09-21
    • 2022-10-20
    • 1970-01-01
    • 2018-03-18
    相关资源
    最近更新 更多