【问题标题】:Spark Structured Streaming not restarting at Kafka offsetsSpark Structured Streaming 未在 Kafka 偏移处重新启动
【发布时间】:2017-12-31 14:21:48
【问题描述】:

我们有一个长时间运行的 Spark Structured Streaming 查询,它正在从 Kafka 读取数据,我们希望此查询在重新启动后从中断处继续。但是,我们已将startingOffsets 设置为“earliest”,重新启动后我们看到查询再次从 Kafka 主题的开头读取。

我们的基本查询如下所示:

  val extract = sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "server:port")
    .option("subscribe", "topic")
    .option("startingOffsets", "earliest")
    .load()

  val query: StreamingQuery = extract 
    .writeStream
    .option("checkpointLocation", s"/tmp/checkpoint/kafka/")
    .foreach(writer)
    .start()

我们看到检查点目录被正确创建,并且在偏移文件中具有我们期望的偏移量。

当我们重新启动时,我们会看到如下消息:

25-07-2017 14:35:32 INFO  ConsumerCoordinator:231 - Setting newly assigned partitions [KafkaTopic-2, KafkaTopic-1, KafkaTopic-0, KafkaTopic-3] for group spark-kafka-source-dedc01fb-c0a7-40ea-8358-a5081b961968--1396947302-driver

我们告诉查询从“earliest”开始,但文档说:

This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off.

这不应该意味着重新启动我们的应用程序会导致查询从中断的地方恢复吗?

Spark Structured Streaming 不允许为 Kafka 设置“group.id”。看到这个:Note that the following Kafka params cannot be set and the Kafka source will throw an exception.

我尝试添加queryName,以防它被用于跨运行识别查询但没有任何效果。

我们在 YARN 上使用 Spark 2.1。

关于为什么这不起作用或我们做错了什么的任何想法?

使用日志更新:

From the Driver

From the Worker

【问题讨论】:

  • 日志没有任何意义。 Spark 将为每次运行创建一个唯一的 ID 作为组 ID。您可以使用StreamingQuery.recentProgress打印最近的进度,它应该包含偏移信息。
  • 这没有多大帮助。它运行良好。它只是从第一个偏移量开始,而不是重新启动时的最后一次读取。
  • 上一次运行只有一批?如果是这样,在 2.2.0 之前,Spark 总是会重新运行上一次运行的最后一批。
  • 在我看来,Spark 在重新启动时根本没有拾取检查点目录。我在 1) 删除检查点目录并运行应用程序和 2) 在处理完所有消息后停止应用程序,然后使用看起来不错的检查点目录重新启动时没有发现差异。在这两种情况下,它都从最早的 Kafka 消息开始。我认为我做错了什么,但不知道是什么。
  • 您介意分享日志吗?

标签: scala apache-spark spark-structured-streaming


【解决方案1】:

首先,你为什么说检查点目录又被创建了。您是在初次运行后删除它然后恢复它吗?

所以,为了清楚起见,“.option("startingOffsets", "earliest")" 设置将在您第一次启动查询时从头开始读取。 并认为出了点问题,流停止了。您修复它并再次启动流(不删除检查点目录),流应该从之前停止的偏移量开始。

如果您删除了检查点目录然后恢复了流,显然它不会读取任何偏移历史记录(因为您已经删除了检查点),因此将从第一个(最早)偏移量开始在卡夫卡上可用。

【讨论】:

    猜你喜欢
    • 2021-05-22
    • 2020-09-03
    • 2017-02-06
    • 2019-07-30
    • 2018-04-26
    • 2017-06-22
    • 2018-09-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多