【问题标题】:sparklyr spark_apply is very slowsparklyr spark_apply 很慢
【发布时间】:2019-04-25 11:36:15
【问题描述】:

sparklyr spark_apply 非常慢/根本没有响应。签入 spark UI 时,正在执行的阶段是在 utils.scala:204 处收集的。它正在执行 0/1(1 个运行)任务。应用 spark_apply 的数据框有 30 个分区。此任务没有任何进展,以及为什么正在执行单个任务

library(sparklyr)
library(dplyr)
config=spark_config()
config=c(config, list("spark.files"="hdfs:///bundle/packages.tar","spark.dynamicAllocation.enabled"="false","spark.executor.memory"="10g","spark.executor.cores"="4","spark.executor.instances"="7"))
sc <- spark_connect(master="yarn", app_name = "demo",config = config,version="2.3.0")
demo_data <- spark_read_csv(sc,name='demo_data',path = '/data.txt',delimiter = '\t',infer_schema = FALSE, columns = list(column1 = "integer"))
spark_apply(demo_data, function(df) df * 10, packages = "packages.tar" ,columns=list(column1="integer"))

【问题讨论】:

    标签: apache-spark sparklyr


    【解决方案1】:

    我的诀窍是在集群初始化之后运行一个最小的 spark_apply 函数(来自 rstudio.com):spark_apply(function(e) I(e)) 用于从一开始就在每个节点上初始化 R 环境。

    在“用 R 掌握火花”一书中 javier luraschi 建议使用 spark_apply as a last resort,即使是 following explainations,也表明为了克服瓶颈已投入大量资金,尤其是箭头库的开发。

    也许此时他应该提到更合适的方式(参见Spark and Sparklyr)来使用 sparklyr 运行并行任务。

    在 github 上,他解释说 spark_apply 遇到了序列化问题,正如 herehere 所解释的那样

    另一方面,randomgambit 认为性能问题是由于 Sparklyr 行为在每个节点上复制整个 R 分布。

    就我而言,问题不在本地模式下, 但是在集群模式下第一次执行 spark_apply 时:

    使用来自rstudio.com 的测试命令 基准测试给出了以下性能来运行这个表达式

    sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e)) 
    
    • 本地模式:

      Unit: seconds min lq mean median uq max neval 5.043947 5.043947 5.043947 5.043947 5.043947 5.043947 1

    • 集群模式(一个master,一个worker):

      • 第一次执行

      Unit: seconds min lq mean median uq max neval 928.0637 928.0637 928.0637 928.0637 928.0637 928.0637 1

      • 第二次执行

      Unit: seconds min lq mean median uq max neval 4.309775 4.309775 4.309775 4.309775 4.309775 4.309775 1

    鉴于第二次执行要快得多,我认为 Sparklyr 在每个节点上复制整个 R 分布需要 923 秒 = 15 分 23 秒

    这里是使用的代码:

    library(dplyr) 
    library(sparklyr) 
    library(microbenchmark)
    sc <- spark_connect(master = "local")
    
    microbenchmark(
      sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e))
      ,times = 1L)    
    
    conf <- spark_config()
    conf[["spark.r.command"]] <- "d:/path_to/R-3.6.1/bin/Rscript.exe"
    sc <- spark_connect(master="spark://192.168.0.12:7077", 
                                            version = "2.4.3",
                                            spark_home = "C:\\Users\\username\\AppData\\Local\\spark\\spark-2.4.3-bin-hadoop2.7",
                                            config = conf)
    microbenchmark(
      sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e))
      ,times = 1L)
    microbenchmark(
      sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e))
      ,times = 1L)
    

    【讨论】:

    • 与 spark-2.4.4-bin-hadoop2.7 相同
    • 事实上,spark_apply 只有在你不能将你的 R 代码转换成 dplyr 的语法时才应该使用,在这种情况下,你的代码应该被 sparklyr 转换成 spark 例程。相反,spark_apply 在 spark 节点上启动一个完整的 R 环境。
    猜你喜欢
    • 2018-08-21
    • 2019-03-16
    • 2018-03-30
    • 2018-05-24
    • 1970-01-01
    • 2018-04-20
    • 2019-03-23
    • 1970-01-01
    • 2018-11-17
    相关资源
    最近更新 更多