【问题标题】:Spark Streaming Kafka direct stream processing time performance spikesSpark Streaming Kafka 直接流处理时间性能峰值
【发布时间】:2016-03-04 07:44:49
【问题描述】:

我有一个 Spark Streaming 作业,它使用直接方法从 Kafka 集群读取数据。我无法理解处理时间的周期性峰值,也没有反映在 Spark UI 指标中。下图显示了这种模式(批处理时间 = 10s):

每次运行作业时都会重现此问题。 Kafka 日志中没有要读取的数据,因此没有要执行的真正处理。我希望这条线是平坦的,接近序列化并将任务发送给执行者的最小值。

模式是一个作业需要 9 秒(这有 5 秒的调度程序延迟),下一个作业需要 5 秒(没有调度程序延迟)接下来的两个作业大约需要 0.8 和 0.2 秒。

根据 Spark UI(除了调度程序延迟),9 秒和 5 秒的作业似乎没有做更多的工作。

这是 5 秒作业的任务时间总结:

没有一个执行者花费将近 5 秒的时间来完成他们的任务。

有其他人经历过这种情况吗?或者您对可能导致这种情况的原因有什么建议吗?

这是主要流代码的精简版:

def main(args: Array[String]): Unit = {
    val (runtimeConfig: RuntimeConfig, cassandraConfig: CassandraConfig.type, kafkaConfig: KafkaConfig.type,
streamingContext: StreamingContext) = loadConfig(args)

    val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaConfig.metadataBrokerList, "fetch.message.max.bytes" -> kafkaConfig.fetchMessageMaxBytes)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, Set(runtimeConfig.kafkaTopic))
    val uuidGenerator = streamingContext.sparkContext.broadcast(Generators.timeBasedGenerator(EthernetAddress.fromInterface()))

    runtimeConfig.kafkaTopic match {
      case Topics.edges => saveEdges(runtimeConfig, messages, uuidGenerator)
      case Topics.messages => {val formatter = streamingContext.sparkContext.broadcast(DateTimeFormat.forPattern(AppConfig.dateFormat))
    saveMessages(cassandraConfig, runtimeConfig, messages, formatter)}
    }
    streamingContext.start()
    streamingContext.awaitTermination()
}

def saveEdges(runtimeConfig: RuntimeConfig, kafkaStream: DStream[(String, String)],
               uuidGenerator: Broadcast[TimeBasedGenerator]): Unit = {
      val edgesMessages = kafkaStream.flatMap(msg => {
    implicit val formats = DefaultFormats
    parse(msg._2).extract[List[EdgeMessage]].flatMap(em => (List.fill(em.ids.size)(em.userId) zip em.ids))
  }).map(edge => Edge(edge._1, edge._2)).saveAsTextFiles("tester", ".txt")
}

火花设置:

val conf = new SparkConf()
.set("spark.mesos.executor.home", AppConfig.sparkHome)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.kafka.maxRatePerPartition", "1")
.set("spark.streaming.blockInterval", "500")
.set("spark.cores.max", "36")

相关的 build.sbt 提取:

"org.apache.spark" % "spark-streaming-kafka_2.10"  % "1.5.1",
"org.apache.spark" %% "spark-core" % "1.5.1",
"org.apache.spark" %% "spark-streaming" % "1.5.1",
"org.apache.spark" %% "spark-graphx" % "1.5.1",
  • Kafka 版本:2-10-0.8.2.1
  • 资源管理器:Mesos 0.23
  • 集群详细信息:6 个 Spark Workers、6 个 Kafka Brokers、5 个节点 Zookeeper Ensemble(在同一台机器上)。 12 个 Kafka 分区。

注意:sparktmpkafka-logs 通常位于每个节点上的相同旋转磁盘上。

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming


    【解决方案1】:

    问题似乎出在 Mesos 调度程序上。我不确定为什么它开始减慢这样的工作。但是我重新启动了 Mesos 集群,现在锯齿处理时间已经过去了。

    正如您在此处看到的,处理时间现在更加稳定:

    【讨论】:

    • 您在哪里使用哪种调度模式? coarsefine-grained ?
    • @maasg 我希望我能记得我当时使用的是哪一个。我想这对我来说很有意义,因为我可能正在使用细粒度,因为它正在执行更多动态调度并且具有更高的任务创建开销,但我现在真的不知道我当时使用的是哪一个。
    猜你喜欢
    • 2016-03-12
    • 2017-02-06
    • 2019-06-28
    • 2020-10-10
    • 2021-05-22
    • 2020-07-25
    • 1970-01-01
    • 1970-01-01
    • 2017-06-24
    相关资源
    最近更新 更多