【问题标题】:How spark streaming application works when it fails?Spark Streaming 应用程序在失败时如何工作?
【发布时间】:2022-06-10 23:39:49
【问题描述】:

我开始学习使用 kinesis 的 Spark 流应用程序。我遇到了一个案例,我们的 Spark 流应用程序失败,它重新启动,但问题是,当它重新启动时,它尝试处理的消息数量超过了它可以处理的数量并再次失败。所以,

  1. 有什么方法可以限制 Spark 流应用程序可以处理的数据量(以字节为单位)?
  2. 总而言之,如果 Spark 流应用程序失败并保持关闭 1 或 2 小时,并且 InitialPositionInStream 设置为 TRIM_HORIZON,那么当它重新启动时,它将从最后处理的消息开始在 kinesis 流中,但由于 kinesis 中正在进行实时摄取,那么 spark 流应用程序如何处理 kinesis 中存在的这 1 或 2 小时的数据以及在 kinesis 中摄取的实时数据?

PS - 火花流在 EMR 中运行,批量大小设置为 15 秒,kinesis CheckPointInterval 设置为 60 秒,每 60 秒后它会将处理后的数据详细信息写入 DynamoDB。

如果我的问题不清楚,或者您需要更多信息来回答我的问题,请告诉我。

火花流运动

谢谢..

【问题讨论】:

    标签: amazon-web-services apache-spark spark-streaming aws-kinesis


    【解决方案1】:

    假设您正在尝试从消息队列(如 kafka 或事件中心)中读取数据。 如果是这样的话,当 Spark 流应用程序出现故障时,它会尝试从它在失败之前留下的偏移量处处理数据。

    到那时,您重新启动作业 - 它会积累更多数据,它会尝试处理所有积压数据,并且会因内存不足或执行程序丢失而失败。

    为了防止这种情况,您可以使用类似“maxOffsetsPerTrigger”的配置,通过阻止作业一次读取所有数据来创建一个背压机制。它将简化数据提取和处理。

    更多详情可以在这里找到:https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

    来自官方文档

    每个触发器处理的最大偏移数的速率限制 间隔。指定的偏移总数将成比例 跨不同卷的 topicPartitions 拆分。

    设置每个触发器的最大偏移量的示例

     val df = spark
        .read
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1")
        .option("subscribe", "topicName")
        .option("startingOffsets", "latest")
        .option("maxOffsetsPerTrigger", "10000")
        .load()
    

    【讨论】:

      猜你喜欢
      • 2016-10-11
      • 1970-01-01
      • 1970-01-01
      • 2017-04-02
      • 1970-01-01
      • 2021-11-17
      • 2015-04-17
      • 2017-03-16
      • 1970-01-01
      相关资源
      最近更新 更多