【问题标题】:Spark 3 structured streaming use maxOffsetsPerTrigger in Kafka source with Trigger.OnceSpark 3结构化流在Kafka源中使用maxOffsetsPerTrigger和Trigger.Once
【发布时间】:2021-03-14 12:27:37
【问题描述】:

我们需要在 Kafka 源中使用 maxOffsetsPerTrigger 和结构化流中的 Trigger.Once(),但基于此 issue 似乎在 spark 3 中读取 allAvailable。在这种情况下有没有办法实现速率限制?

这是 spark 3 中的示例代码:

def options: Map[String, String] = Map(
  "kafka.bootstrap.servers" -> conf.getStringSeq("bootstrapServers").mkString(","),
  "subscribe" -> conf.getString("topic")
) ++
  Option(conf.getLong("maxOffsetsPerTrigger")).map("maxOffsetsPerTrigger" -> _.toString)
val streamingQuery = sparkSession.readStream.format("kafka").options(options)
  .load
  .writeStream
  .trigger(Trigger.Once)
  .start()

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming spark-kafka-integration


    【解决方案1】:

    没有其他方法可以正确设置速率限制。如果maxOffsetsPerTrigger 不适用于带有Once 触发器的流式作业,您可以执行以下操作以获得相同的结果:

    1. 选择另一个触发器并使用maxOffsetsPerTrigger 限制速率并在完成处理所有数据后手动终止此作业。

    2. 使用选项startingOffsetsendingOffsets 同时使作业成为批处理 作业。重复,直到您处理完主题中的所有数据。但是,“RunOnce 模式下的 Streaming 优于 Batch”是有原因的,详见 here

    最后一个选择是查看链接的拉取请求并自行编译 Spark。

    【讨论】:

      【解决方案2】:

      这是我们如何“解决”这个问题的。这基本上是mike 在接受的答案中所写的方法。

      在我们的例子中,消息的大小变化很小,因此我们知道处理一个批处理需要多少时间。简而言之,我们:

      • Trigger.Once() 更改为Trigger.ProcessingTime(<ms>),因为maxOffsetsPerTrigger 在此模式下工作
      • 通过调用awaitTermination(<ms>) 来模仿Trigger.Once() 终止了这个正在运行的查询
      • 将处理间隔设置为大于终止间隔,以便恰好适合处理一个“批次”
      
      val kafkaOptions = Map[String, String](
            "kafka.bootstrap.servers" -> "localhost:9092",
            "failOnDataLoss" -> "false",
            "subscribePattern" -> "testTopic",
            "startingOffsets" -> "earliest",
            "maxOffsetsPerTrigger" -> "10",  // "batch" size
          )
      
      val streamWriterOptions = Map[String, String](
          "checkpointLocation" -> "path/to/checkpoints",
        )
      
      val processingInterval = 30000L
      val terminationInterval = 15000L
      
      sparkSession
            .readStream
            .format("kafka")
            .options(kafkaOptions)
            .load()
            .writeStream
            .options(streamWriterOptions)
            .format("Console")
            .trigger(Trigger.ProcessingTime(processingInterval))
            .start()
            .awaitTermination(terminationInterval)
      
      

      这是可行的,因为第一批将根据maxOffsetsPerTrigger 限制进行读取和处理。说,在 10 秒内。然后开始处理第二批,但它在大约 5 秒后在操作中间终止,并且从未达到设置的 30 秒标记。但它正确存储了偏移量。在下一次运行中拾取并处理这个“被杀死”的批次。

      这种方法的一个缺点是您必须大致知道处理一个“批次”需要多少时间 - 如果您将 terminationInterval 设置得太低,则作业的输出将始终为零。

      当然,如果您不关心在一次运行中处理的确切批次数,您可以轻松地将processingInterval 调整为比terminationInterval 小几倍。在这种情况下,您可以一次性处理不同数量的批次,但仍然尊重maxOffsetsPerTrigger 的值。

      【讨论】:

        猜你喜欢
        • 2018-01-24
        • 2017-03-31
        • 2023-04-06
        • 1970-01-01
        • 2020-01-31
        • 2023-03-20
        • 2018-11-14
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多