【发布时间】:2016-03-04 07:44:49
【问题描述】:
我有一个 Spark Streaming 作业,它使用直接方法从 Kafka 集群读取数据。我无法理解处理时间的周期性峰值,也没有反映在 Spark UI 指标中。下图显示了这种模式(批处理时间 = 10s):
每次运行作业时都会重现此问题。 Kafka 日志中没有要读取的数据,因此没有要执行的真正处理。我希望这条线是平坦的,接近序列化并将任务发送给执行者的最小值。
模式是一个作业需要 9 秒(这有 5 秒的调度程序延迟),下一个作业需要 5 秒(没有调度程序延迟)接下来的两个作业大约需要 0.8 和 0.2 秒。
根据 Spark UI(除了调度程序延迟),9 秒和 5 秒的作业似乎没有做更多的工作。
这是 5 秒作业的任务时间总结:
没有一个执行者花费将近 5 秒的时间来完成他们的任务。
有其他人经历过这种情况吗?或者您对可能导致这种情况的原因有什么建议吗?
这是主要流代码的精简版:
def main(args: Array[String]): Unit = {
val (runtimeConfig: RuntimeConfig, cassandraConfig: CassandraConfig.type, kafkaConfig: KafkaConfig.type,
streamingContext: StreamingContext) = loadConfig(args)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaConfig.metadataBrokerList, "fetch.message.max.bytes" -> kafkaConfig.fetchMessageMaxBytes)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, Set(runtimeConfig.kafkaTopic))
val uuidGenerator = streamingContext.sparkContext.broadcast(Generators.timeBasedGenerator(EthernetAddress.fromInterface()))
runtimeConfig.kafkaTopic match {
case Topics.edges => saveEdges(runtimeConfig, messages, uuidGenerator)
case Topics.messages => {val formatter = streamingContext.sparkContext.broadcast(DateTimeFormat.forPattern(AppConfig.dateFormat))
saveMessages(cassandraConfig, runtimeConfig, messages, formatter)}
}
streamingContext.start()
streamingContext.awaitTermination()
}
def saveEdges(runtimeConfig: RuntimeConfig, kafkaStream: DStream[(String, String)],
uuidGenerator: Broadcast[TimeBasedGenerator]): Unit = {
val edgesMessages = kafkaStream.flatMap(msg => {
implicit val formats = DefaultFormats
parse(msg._2).extract[List[EdgeMessage]].flatMap(em => (List.fill(em.ids.size)(em.userId) zip em.ids))
}).map(edge => Edge(edge._1, edge._2)).saveAsTextFiles("tester", ".txt")
}
火花设置:
val conf = new SparkConf()
.set("spark.mesos.executor.home", AppConfig.sparkHome)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.kafka.maxRatePerPartition", "1")
.set("spark.streaming.blockInterval", "500")
.set("spark.cores.max", "36")
相关的 build.sbt 提取:
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.1",
"org.apache.spark" %% "spark-core" % "1.5.1",
"org.apache.spark" %% "spark-streaming" % "1.5.1",
"org.apache.spark" %% "spark-graphx" % "1.5.1",
- Kafka 版本:2-10-0.8.2.1
- 资源管理器:Mesos 0.23
- 集群详细信息:6 个 Spark Workers、6 个 Kafka Brokers、5 个节点 Zookeeper Ensemble(在同一台机器上)。 12 个 Kafka 分区。
注意:sparktmp 和 kafka-logs 通常位于每个节点上的相同旋转磁盘上。
【问题讨论】:
标签: apache-spark apache-kafka spark-streaming