【发布时间】:2020-01-14 05:33:30
【问题描述】:
我是 Spark 的新手,并试图弄清楚动态资源分配的工作原理。我有 spark 结构化流应用程序,它试图一次从 Kafka 读取数百万条记录并处理它们。我的应用程序总是以 3 个执行器开始,并且从不增加执行器的数量。
完成处理需要 5-10 分钟。我认为它会增加执行者的数量(最多 10 个)并尝试更快地完成处理,但这并没有发生。我在这里错过了什么?这应该如何工作?
我在 Ambari for Spark 中设置了以下属性
spark.dynamicAllocation.enabled = true
spark.dynamicAllocation.initialExecutors = 3
spark.dynamicAllocation.maxExecutors = 10
spark.dynamicAllocation.minExecutors = 3
spark.shuffle.service.enabled = true
下面是我的提交命令的样子
/usr/hdp/3.0.1.0-187/spark2/bin/spark-submit --class com.sb.spark.sparkTest.sparkTest --master yarn --deploy-mode cluster --queue default sparkTest-assembly-0.1.jar
火花代码
//read stream
val dsrReadStream = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", brokers) //kafka bokers
.option("startingOffsets", startingOffsets) // start point to read
.option("maxOffsetsPerTrigger", maxoffsetpertrigger) // no. of records per batch
.option("failOnDataLoss", "true")
/****
Logic to validate format of loglines. Writing invalid log lines to kafka and store valid log lines in 'dsresult'
****/
//write stream
val dswWriteStream =dsresult.writeStream
.outputMode(outputMode) // file write mode, default append
.format(writeformat) // file format ,default orc
.option("path",outPath) //hdfs file write path
.option("checkpointLocation", checkpointdir) location
.option("maxRecordsPerFile", 999999999)
.trigger(Trigger.ProcessingTime(triggerTimeInMins))
【问题讨论】:
-
你有多少个Kafka分区?
-
3个kafka分区
-
因此,您的应用程序没有理由请求更多资源。三个 Kafka 分区 -> 3 个 Spark 分区 -> 3 个执行器,即使在单核上,每个都足以实现最大并行度。
-
目前使用 3 个执行器处理百万条记录大约需要 5-7 分钟。我已经用 6 个执行器进行了相同的测试,并且花费的时间相对较少。如果我将 max executors 设置为 10,它不应该动态使用更多的 executors(超过 3 个,如果有的话)来增加处理时间吗?我理想的预期处理时间小于 2 分钟。我应该增加初始执行者来实现这一点吗?请告知。
标签: apache-spark