【发布时间】:2017-12-31 14:21:48
【问题描述】:
我们有一个长时间运行的 Spark Structured Streaming 查询,它正在从 Kafka 读取数据,我们希望此查询在重新启动后从中断处继续。但是,我们已将startingOffsets 设置为“earliest”,重新启动后我们看到查询再次从 Kafka 主题的开头读取。
我们的基本查询如下所示:
val extract = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server:port")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.load()
val query: StreamingQuery = extract
.writeStream
.option("checkpointLocation", s"/tmp/checkpoint/kafka/")
.foreach(writer)
.start()
我们看到检查点目录被正确创建,并且在偏移文件中具有我们期望的偏移量。
当我们重新启动时,我们会看到如下消息:
25-07-2017 14:35:32 INFO ConsumerCoordinator:231 - Setting newly assigned partitions [KafkaTopic-2, KafkaTopic-1, KafkaTopic-0, KafkaTopic-3] for group spark-kafka-source-dedc01fb-c0a7-40ea-8358-a5081b961968--1396947302-driver
我们告诉查询从“earliest”开始,但文档说:
这不应该意味着重新启动我们的应用程序会导致查询从中断的地方恢复吗?
Spark Structured Streaming 不允许为 Kafka 设置“group.id”。看到这个:Note that the following Kafka params cannot be set and the Kafka source will throw an exception.
我尝试添加queryName,以防它被用于跨运行识别查询但没有任何效果。
我们在 YARN 上使用 Spark 2.1。
关于为什么这不起作用或我们做错了什么的任何想法?
使用日志更新:
【问题讨论】:
-
日志没有任何意义。 Spark 将为每次运行创建一个唯一的 ID 作为组 ID。您可以使用
StreamingQuery.recentProgress打印最近的进度,它应该包含偏移信息。 -
这没有多大帮助。它运行良好。它只是从第一个偏移量开始,而不是重新启动时的最后一次读取。
-
上一次运行只有一批?如果是这样,在 2.2.0 之前,Spark 总是会重新运行上一次运行的最后一批。
-
在我看来,Spark 在重新启动时根本没有拾取检查点目录。我在 1) 删除检查点目录并运行应用程序和 2) 在处理完所有消息后停止应用程序,然后使用看起来不错的检查点目录重新启动时没有发现差异。在这两种情况下,它都从最早的 Kafka 消息开始。我认为我做错了什么,但不知道是什么。
-
您介意分享日志吗?
标签: scala apache-spark spark-structured-streaming