【问题标题】:Dynamic resource allocation for spark applications not working火花应用程序的动态资源分配不起作用
【发布时间】:2020-01-14 05:33:30
【问题描述】:

我是 Spark 的新手,并试图弄清楚动态资源分配的工作原理。我有 spark 结构化流应用程序,它试图一次从 Kafka 读取数百万条记录并处理它们。我的应用程序总是以 3 个执行器开始,并且从不增加执行器的数量。

完成处理需要 5-10 分钟。我认为它会增加执行者的数量(最多 10 个)并尝试更快地完成处理,但这并没有发生。我在这里错过了什么?这应该如何工作?

我在 Ambari for Spark 中设置了以下属性

spark.dynamicAllocation.enabled = true
spark.dynamicAllocation.initialExecutors = 3
spark.dynamicAllocation.maxExecutors = 10
spark.dynamicAllocation.minExecutors = 3
spark.shuffle.service.enabled = true

下面是我的提交命令的样子

/usr/hdp/3.0.1.0-187/spark2/bin/spark-submit --class com.sb.spark.sparkTest.sparkTest --master yarn --deploy-mode cluster --queue default sparkTest-assembly-0.1.jar

火花代码

//read stream
val dsrReadStream = spark.readStream.format("kafka")
   .option("kafka.bootstrap.servers", brokers) //kafka bokers
   .option("startingOffsets", startingOffsets) // start point to read
   .option("maxOffsetsPerTrigger", maxoffsetpertrigger) // no. of records per batch
   .option("failOnDataLoss", "true")

 /****
 Logic to validate format of loglines. Writing invalid log lines to kafka and store valid log lines in 'dsresult'

 ****/

//write stream
val dswWriteStream =dsresult.writeStream
    .outputMode(outputMode) // file write mode, default append
    .format(writeformat) // file format ,default orc
    .option("path",outPath) //hdfs file write path
    .option("checkpointLocation", checkpointdir) location
    .option("maxRecordsPerFile", 999999999) 
    .trigger(Trigger.ProcessingTime(triggerTimeInMins))

【问题讨论】:

  • 你有多少个Kafka分区?
  • 3个kafka分区
  • 因此,您的应用程序没有理由请求更多资源。三个 Kafka 分区 -> 3 个 Spark 分区 -> 3 个执行器,即使在单核上,每个都足以实现最大并行度。
  • 目前使用 3 个执行器处理百万条记录大约需要 5-7 分钟。我已经用 6 个执行器进行了相同的测试,并且花费的时间相对较少。如果我将 max executors 设置为 10,它不应该动态使用更多的 executors(超过 3 个,如果有的话)来增加处理时间吗?我理想的预期处理时间小于 2 分钟。我应该增加初始执行者来实现这一点吗?请告知。

标签: apache-spark


【解决方案1】:

动态资源分配不适用于 Spark Streaming

Refer this link

【讨论】:

  • spark.streaming.dynamicAllocation.enabled=true ?
【解决方案2】:

只是为了进一步澄清,

spark.streaming.dynamicAllocation.enabled=true

仅适用于 Dstreams API。见Jira

另外,如果你设置了

spark.dynamicAllocation.enabled=true

并运行结构化流式作业,批处理动态分配算法启动,这可能不是非常优化。见Jira

【讨论】:

    猜你喜欢
    • 2018-04-13
    • 1970-01-01
    • 2021-10-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多