【问题标题】:Limit Kafka batches size when using Spark Streaming使用 Spark Streaming 时限制 Kafka 批量大小
【发布时间】:2017-02-20 06:07:03
【问题描述】:

是否可以限制 Kafka 消费者为 Spark Streaming 返回的批次大小?

我之所以问,是因为我得到的第一批记录有数亿条记录,并且需要很长时间来处理和检查它们。

【问题讨论】:

  • 您当前的批处理间隔是多少?如果它更多尝试减少批处理间隔,那么您可以获得更少的数据。
  • 忽略了你的问题,你提到了第一批..

标签: apache-spark apache-kafka spark-streaming kafka-consumer-api


【解决方案1】:

限制最大批量大小有助于控制处理时间,但会增加消息的处理延迟。

通过属性下面的设置,我们可以控制批量大小 spark.streaming.receiver.maxRate= spark.streaming.kafka.maxRatePerPartition=

您甚至可以通过启用背压来根据处理时间动态设置批处理大小 spark.streaming.backpressure.enabled:真 spark.streaming.backpressure.initialRate:

【讨论】:

    【解决方案2】:

    除了上面的答案。批量大小是 3 个参数的乘积

    1. batchDuration:流数据分批的时间间隔(以秒为单位)。
    2. spark.streaming.kafka.maxRatePerPartition:设置每个分区每秒的最大消息数。这与batchDuration 结合使用将控制批量大小。您希望将maxRatePerPartition 设置为很大(否则您实际上是在限制您的工作),而batchDuration 则非常小。
    3. kafka 主题中的分区数

    为了更好地解释此产品在启用/禁用背压时如何工作 (set spark.streaming.kafka.maxRatePerPartition for createDirectStream)

    【讨论】:

    • 这个答案比公认的答案更准确。
    • 这真的很有意义。谢谢你的澄清。但我的情况略有不同。在我的工作中,我从多个主题中消费。所以,问题是:NumberOfTopics X NumberOfPartitions X MaxRatePerPartition X BatchDuration 我想设置这个最大值。将 Spark 2.4 与 Direct Kafka Stream 结合使用。
    【解决方案3】:

    我认为Spark Streaming Backpressure可以解决您的问题。

    检查spark.streaming.backpressure.enabledspark.streaming.backpressure.initialRate

    默认情况下spark.streaming.backpressure.initialRate 未设置spark.streaming.backpressure.enabled 默认情况下禁用,所以我想 spark 会尽可能多地使用。

    来自Apache Spark Kafka configuration

    spark.streaming.backpressure.enabled:

    这使 Spark Streaming 能够根据接收速率控制 关于当前的批处理调度延迟和处理时间,以便 系统只能以系统可以处理的速度接收。 在内部,这会动态设置最大接收速率 接收器。这个比率的上限是值 spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerPartition 如果已设置(见下文)。

    而且由于您想控制第一批,或者更具体地-第一批的消息数量,我认为您需要spark.streaming.backpressure.initialRate

    spark.streaming.backpressure.initialRate:

    这是每个接收器的初始最大接收速率 在背压机制启动时接收第一批数据 已启用。

    当您的 Spark 作业(分别是 Spark 工作人员)能够处理来自 kafka 的 10000 条消息,但 kafka 代理向您的作业提供 100000 条消息时,这个很好。

    也许您也有兴趣查看spark.streaming.kafka.maxRatePerPartition 以及Jeroen van Wilgenburg on his blog 对这些属性的真实示例的一些研究和建议。

    【讨论】:

    • 这就是我想要的,谢谢。不幸的是,spark.streaming.backpressure.initialRate、spark.streaming.backpressure.enabled、spark.streaming.receiver.maxRate 和 spark.streaming.receiver.initialRate 都没有改变我获得的记录数量(我尝试了许多不同的组合)。唯一有效的配置是“spark.streaming.kafka.maxRatePerPartition”。这总比没有好,但是启用背压以进行自动缩放会很有用。你知道为什么背压不起作用吗?如何调试这个?
    • 也许 spark.streaming.backpressure.initialRate 可以工作,但正如 Jeroen van Wilgenburg 在他的博客中所注意到的那样“设置最大值是个好主意,因为背压算法不是即时的(这是不可能的)......麻烦当 Kafka 决定在最初几秒钟内为我们提供 50.000 条记录/秒时,使用 Kafka 输入的作业可以处理大约 1000 个事件/秒。” ..但我很困惑,因为没有工作。spark.streaming.backpressure.enabled 应该“在内部,动态设置接收器的最大接收速率”
    • 在最新的流媒体文档中,它提到设置spark.streaming.backpressure.enabled 会动态处理费率。 “在 Spark 1.5 中,我们引入了一个称为背压的功能,它消除了设置此速率限制的需要,因为 Spark Streaming 会自动计算速率限制并在处理条件发生变化时动态调整它们。”,这可以解释为什么速率没有'如果该属性设置为 true,则不起作用。
    • 我正在使用带有 createStream API 的 spark 1.6.1 我也无法利用 spark.streaming.backpressure.enabled=true 这对我也不起作用,唯一的设置对我有用是 spark.streaming.receiver.maxRate
    • 这种方法适用于 Spark 结构化流吗?
    猜你喜欢
    • 2019-03-28
    • 2015-09-14
    • 2019-04-30
    • 2017-07-10
    • 2023-01-05
    • 2020-10-29
    • 2016-10-26
    • 2019-08-08
    • 1970-01-01
    相关资源
    最近更新 更多