【问题标题】:Spark Streaming Kafka direct consumer consumption speed dropSpark Streaming Kafka 直接消费者消费速度下降
【发布时间】:2017-05-04 11:37:48
【问题描述】:

Kafka 直接消费者开始将读取限制为每批(5 秒)读取 450 个事件(5 * 90 个分区),在此之前它运行良好 1 或 2 天(每批大约 5000 到 40000 个事件)

我正在使用在 AWS 中运行的 spark 独立集群(spark 和 spark-streaming-kafka 版本 1.6.1)并使用 S3 存储桶作为检查点目录StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext),每个工作节点上没有调度延迟和足够的磁盘空间.

没有改变任何Kafka客户端初始化参数,很确定kafka的结构没有改变:

val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker)
val topics = Set(kafkaConfig.topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topics)

也无法理解为什么当直接消费者描述说The consumed offsets are by the stream itself我在创建流上下文时仍然需要使用检查点目录?

【问题讨论】:

  • spark.streaming.backpressure.enabled 是否设置为 true
  • 是的,我会尝试禁用它
  • 看起来有帮助
  • 很高兴知道 :)

标签: scala amazon-web-services apache-spark apache-kafka spark-streaming


【解决方案1】:

这通常是通过将 spark.streaming.backpressure.enabled 设置为 true 来启用背压的结果。通常,当背压算法看到有更多数据进入时,它会开始将每个批次限制在一个相当小的大小,直到它可以再次“稳定”自己。这有时会出现误报并导致您的流降低处理速度。

如果你想稍微调整一下启发式,它使用了一些未记录的标志(只要确保你知道你在做什么):

val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)

如果您想了解血淋淋的细节,PIDRateEstimator 就是您要找的。​​p>

【讨论】:

    猜你喜欢
    • 2017-02-23
    • 2014-12-30
    • 2018-12-18
    • 1970-01-01
    • 1970-01-01
    • 2016-07-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多