【问题标题】:Multiple Processes Instead of for loop in R多个进程而不是R中的for循环
【发布时间】:2021-04-04 02:46:30
【问题描述】:

我希望在parallel process 中运行for loop。我使用for loop R 代码得到的结果符合我的口味,但会将其应用于非常huge data,因此执行时间很慢。

library(forecast)
library(dplyr)
arima_order_results = data.frame()
seed_out2 <- c(1, 16, 170, 178, 411, 630, 661, 1242, 1625, 1901, 1926, 1927, 1928, 2170, 2779, 3687, 4139, 4583, 4825, 4828, 4829, 4827, 5103, 5211, 5509, 5561, 5569, 5679, 6344, 6490, 6943, 6944, 6945, 6946, 6948, 6950, 6951, 6952)
for (my_seed in seed_out2){
  set.seed(my_seed)
  ar1 <- arima.sim(n = 100, model=list(ar = 0.8, order = c(1, 0, 0)), sd = 1)
  ar2 <- auto.arima(ar1, ic = "aicc")
  arr <- as.data.frame(t(ar2$coef))
  if(substr(as.character(arr[1]), 1, 5) == "0.800") {

    arr <- cbind(data.frame(seed=my_seed),arr)
    print(arr)

    arima_order_results = bind_rows(arima_order_results,arr)
    # write.csv(my_seed, paste0(arr, ".csv"), row.names = FALSE)

  } #else print("NOT AVAILABLE")
}

结果

#  seed       ar1
#1  170 0.8006368
#  seed       ar1
#1  411 0.8004152
#  seed       ar1
#1  630 0.8008459
#  seed       ar1
#1  661 0.8001553
#  seed       ar1 intercept
#1 1242 0.8000623 0.8474553
#  seed       ar1
#1 1625 0.8004982
#  seed       ar1
#1 1901 0.8007815
#  seed       ar1
#1 1927 0.8004587
#  seed       ar1
#1 2170 0.8003091
#  seed       ar1
#1 2779 0.8008643
#:
#:
#:
#seed      ar1
#1 5679 0.800689
#  seed     ar1 intercept
#1 6344 0.80004 0.9800426
#  seed       ar1
#1 6490 0.8004093
#  seed       ar1
#1 6948 0.8006992

我想要什么

我需要一个并行进程,它可以同时用完我的四个处理器,以便job execution will be fast when I apply it to huge data` 得到相同的结果。

看看我的尝试

library(parallel)    
library(foreach)
library(forecast)
library(dplyr)
library(doSNOW)
cl <- parallel::makeCluster(detectCores(), type = "SOCK")   
doSNOW::registerDoSNOW(cl)
arima_order_results = data.frame()
seed_out2 <- c(1, 16, 170, 178, 411, 630, 661, 1242, 1625, 1901, 1926, 1927, 1928, 2170, 2779, 3687, 4139, 4583, 4825, 4828, 4829, 4827, 5103, 5211, 5509, 5561, 5569, 5679, 6344, 6490, 6943, 6944, 6945, 6946, 6948, 6950, 6951, 6952)
lst_out <- foreach::foreach(my_seed = seq_along(seed_out2), .packages = c("dplyr", "forecast") ) %dopar% {
  set.seed(my_seed)
  ar1 <- arima.sim(n = 100, model=list(ar = 0.8, order = c(1, 0, 0)), sd = 1)
  ar2 <- auto.arima(ar1, ic = "aicc")
  arr <- as.data.frame(t(ar2$coef))
  if(substr(as.character(arr[1]), 1, 5) == "0.800") {

    arr <- cbind(data.frame(seed=my_seed),arr)
    print(arr)

    arima_order_results = bind_rows(arima_order_results,arr)
    # write.csv(my_seed, paste0(arr, ".csv"), row.names = FALSE)

  }
}

查看我的试用结果

#>lst_out
#[[1]]
#NULL

#[[2]]
#NULL

#[[3]]
#NULL

#[[4]]
#NULL
#:
#:
#:
#[[36]]
#NULL

#[[37]]
#NULL

#[[38]]
#NULL

我在 windows 上操作。

伊迪丝

我希望修改@jay.sf 答案,使其包含在一个函数中,就像我在下面提供的函数一样。

FUN1 <- function(n, ar, sd, arr, R, FUN2){
  FUN2 <- function(i, n, ar, sd, arr) {
    set.seed(i)
    ar1 <- arima.sim(n=n, model=list(ar=ar, order=c(1, 0, 0)), sd=sd)
    ar2 <- auto.arima(ar1, ic="aicc")
    (cf <- ar2$coef)
    if (length(cf) == 0) {
    rep(NA, 2)
    }
    else if (all(grepl(c("ar1|intercept"), names(cf))) &  ## using `grepl`
             substr(cf["ar1"], 1, 5) %in% "arr") { 
      c(cf, seed=i)
    }
    else {
      rep(NA, 2)
    }
  }

  seedv <- 1:R

  library(parallel)
  cl <- makeCluster(detectCores() - 1)
  clusterExport(cl, c("FUN2"), envir=environment())
  clusterEvalQ(cl, suppressPackageStartupMessages(library(forecast)))

  res <- parLapply(cl, seedv, "FUN2")

  res1 <- res[!sapply(res, anyNA)]  ## filter out NAs

  stopCluster(cl)
}
FUN1(n = 10, ar = 0.8, sd = 1, arr = 0.800, R = 1000, FUN2 = FUN2)

【问题讨论】:

  • 可能你正在寻找doRNG as here
  • 我需要的是并行处理,它会在parallel processing 的帮助下在我的MWE1 中产生相同的结果。就是这样。
  • 这两种情况是不同的,你给出的例子有两个foreach函数,必须做identical,而我的例子是我有一个for loop函数,它工作得很好但缺乏执行速度,我要求做些什么来达到这个速度。
  • 我的MWE2 只是尝试达到这样的速度,虽然速度达到了,但它没有打印任何结果
  • 只是为了记录,你看过sparkly包吗?您已表示您的目标是在应用于海量数据时更快地执行,这取决于您是否希望计算建立 Spark 连接的时间(每个会话一次),但原则上,Spark 旨在处理此类情况。

标签: r windows parallel-processing arima


【解决方案1】:

这里与我给您的previous related questions 之一的答案类似。

FUN <- function(i) {
  set.seed(i)
  ar1 <- arima.sim(n=100, model=list(ar=0.8, order=c(1, 0, 0)), sd=1)
  ar2 <- auto.arima(ar1, ic="aicc")
  cf <- ar2$coef
  ## case handling
  if (length(cf) == 0) rep(NA, 2)  ## sometimes result is `character(0)` -> NA
  else if (substr(cf[1], 1, 5) %in% "0.800") c(cf, i)  ## hit, that's what we want
  else rep(NA, 2)  ## all other cases -> NA
}

R <- 1e3  ## this would be your 1e5
seedv <- 1:R  ## or use custom seed vector

library(parallel)
cl <- makeCluster(detectCores() - 1)  ## for all cores remove `- 1`
clusterExport(cl, c("FUN"), envir=environment())
clusterEvalQ(cl, suppressPackageStartupMessages(library(forecast)))

res <- `colnames<-`(t(parSapply(cl, seedv, "FUN")), c("cf", "seed"))

stopCluster(cl)

结果

在结果中,我们要过滤掉所有带有NA 的行。

head(res[!is.na(res[,1]), ])
#             cf seed
# [1,] 0.8006368  170
# [2,] 0.8004152  411
# [3,] 0.8008459  630
# [4,] 0.8001553  661

编辑

要包含仅包含"ar1""intercept" 组合的auto.arima 结果,我们最好使用parLapply

FUN <- function(i) {
  set.seed(i)
  ar1 <- arima.sim(n=50, model=list(ar=0.8, order=c(1, 0, 0)), sd=1)
  ar2 <- auto.arima(ar1, ic="aicc")
  (cf <- ar2$coef)
  if (length(cf) == 0) {
    rep(NA, 2)
    }
  else if (all(grepl(c("ar1|intercept"), names(cf))) &  ## using `grepl`
           substr(cf["ar1"], 1, 5) %in% "0.800") { 
    c(cf, seed=i)
    }
  else {
    rep(NA, 2)
    }
}

R <- 1e4
seedv <- 1:R

library(parallel)
cl <- makeCluster(detectCores() - 1)
clusterExport(cl, c("FUN"), envir=environment())
clusterEvalQ(cl, suppressPackageStartupMessages(library(forecast)))

res <- parLapply(cl, seedv, "FUN")

res1 <- res[!sapply(res, anyNA)]  ## filter out NAs

stopCluster(cl)

这给出了列长度不等的数据帧列表,我们可以将其 mergeReduce

res2 <- Reduce(function(...) merge(..., all=T), lapply(res1, function(x) as.data.frame(t(x))))

res2[order(res2$seed), c("ar1", "intercept", "seed")]  ## some ordering
#          ar1 intercept seed
# 1  0.8000531  1.335388  290
# 3  0.8002499        NA 2154
# 10 0.8005477        NA 2888
# 11 0.8006736        NA 3203
# 15 0.8009363        NA 4415
# 14 0.8008462        NA 4572
# 4  0.8003495        NA 4726
# 9  0.8005087        NA 6241
# 2  0.8001865        NA 6417
# 13 0.8008060 -1.700587 6845
# 6  0.8003977        NA 7187
# 8  0.8004316        NA 8981
# 7  0.8004268        NA 9368
# 12 0.8007281        NA 9697
# 5  0.8003903        NA 9793

编辑2

这里有一个函数,只需要用户指定R——迭代次数。它在内部使用doParallel::registerDoParallel 定义一个隐式集群,默认情况下使用通常的detectCores() - 1,但也可以由用户指定。集群将自动停止。此外,还应用了foreach 循环。

library(forecast)
library(doParallel)

arimaze <- function(R, ncores=detectCores() - 1) {
  registerDoParallel(ncores)
  seedv <- 1:R
  FUN <- function(i) {
    set.seed(i)
    ar1 <- arima.sim(n=50, model=list(ar=0.8, order=c(1, 0, 0)), sd=1)
    ar2 <- auto.arima(ar1, ic="aicc")
    cf <- ar2$coef
    if (length(cf) == 0 | !(all(grepl(c("ar1|intercept"), names(cf))) &
                            substr(cf["ar1"], 1, 5) %in% "0.800")) {
      return(rep(NA, 3))
    } else {
      cf <- `length<-`(cf, 2)
      return(c(cf, seed=i))
    }
  }
  message('processing...')
  res <-
    foreach(i=seedv, .combine=rbind.data.frame, .packages='forecast') %dopar% 
    FUN(i)
  message(' done!\n')
  res <- `rownames<-`(res[rowSums(is.na(res)) == 0, ], NULL)
  stopImplicitCluster()
  return(setNames(res, c('ar', 'intercept', 'seed')))
}

用法

r <- arimaze(1.5e4)
# processing... done!

结果

r
#          ar intercept  seed
# 1 0.8000531  1.335388   290
# 2 0.8008060 -1.700587  6845
# 3 0.8003690 -1.443856 12137

【讨论】:

  • 800 中的 else if (substr(cf[1], 3, 5) %in% "800") c(cf, i) ## hit, that's what we want 让我感到惊讶。 0.800 会发生什么?
  • 效果和造型都很棒,就是想通过问上面的问题来学习
  • @DanielJames 啊谢谢,你原来的版本确实更好,因为我的也包括1.800,已编辑!
  • @DanielJames 谢谢,其实也是一个有趣的问题!
  • 我需要你的关注进行私人聊天。
猜你喜欢
  • 1970-01-01
  • 2020-07-13
  • 1970-01-01
  • 2021-12-13
  • 2017-11-14
  • 1970-01-01
  • 1970-01-01
  • 2020-05-20
  • 1970-01-01
相关资源
最近更新 更多