【问题标题】:R: re-writing a loop using the "doparallel", "foreach" and "purrr" librariesR:使用“doparallel”、“foreach”和“purrr”库重写循环
【发布时间】:2021-09-16 14:43:12
【问题描述】:

我在R中编写了以下代码,它对一些人为生成的数据执行(循环)一系列数据操作操作(最终输出称为“final_results”):

#load library
    library(dplyr)

library(data.table)

set.seed(123)

# create some data for this example
a1 = rnorm(1000,100,10)
b1 = rnorm(1000,100,5)
c1 = sample.int(1000, 1000, replace = TRUE)
train_data = data.frame(a1,b1,c1)


####
results_table <- data.frame()

for (i in 1:10 ) {
    
    #generate random numbers
    random_1 =  runif(1, 80, 120)
    random_2 =  runif(1, random_1, 120)
    random_3 =  runif(1, 85, 120)
    random_4 =  runif(1, random_3, 120)
    
    #bin data according to random criteria
    train_data <- train_data %>% mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))
    
    train_data$cat = as.factor(train_data$cat)
    
    #new splits
    a_table = train_data %>%
        filter(cat == "a") %>%
        select(a1, b1, c1, cat)
    
    b_table = train_data %>%
        filter(cat == "b") %>%
        select(a1, b1, c1, cat)
    
    c_table = train_data %>%
        filter(cat == "c") %>%
        select(a1, b1, c1, cat)
    
    split_1 =  runif(1,0, 1)
    split_2 =  runif(1, 0, 1)
    split_3 =  runif(1, 0, 1)
    
    #calculate 60th quantile ("quant") for each bin
    
    table_a = data.frame(a_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_1)))
    
    table_b = data.frame(b_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_2)))
    
    table_c = data.frame(c_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_3)))
    
    
    
    
    #create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
    table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
    table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
    table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)
    
    #group all tables
    
    final_table = rbind(table_a, table_b, table_c)
    
    #create a table: for each bin, calculate the average of "diff"
    final_table_2 = data.frame(final_table %>%
                                   group_by(cat) %>%
                                   summarize(
                                       mean = mean(diff)
                                   ))
    
    #add "total mean" to this table
    final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))
    
    #format this table: add the random criteria to this table for reference
    final_table_2$random_1 = random_1
    
    final_table_2$random_2 = random_2
    
    final_table_2$random_3 = random_3
    
    final_table_2$random_4 = random_4
    
    final_table_2$split_1 = split_1
    
    final_table_2$split_2 = split_2
    
    final_table_2$split_3 = split_3
    
    final_table_2$iteration_number = i
    
    
    results_table <- rbind(results_table, final_table_2)
    
    final_results = dcast(setDT(results_table), iteration_number + random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
    
} 

上面的循环运行得非常好 - 但我正在尝试更多地了解 R 并尝试使用来自其他库(例如 "doParallel", "foreach""purrr" 库)的其他函数重新编写此循环。

选项 1:

我在 R 中遇到了以下代码,它显示了使用“purrr”库编写循环的通用模板(显然“map_df”是一个使用循环代码的函数):

#option 1
library(dplyr)
library(purrr)
library(tictoc)


data_gen <- function(){ #here you insert your data generating process
  tibble(
    x = runif(100),
    y = runif(100)
  )
}

N <- 10000 #number of datasets do be generated


tic('method A')  #not necessary, measures the time of the code between 'tic' and 'toc'
output <- tibble(
  i = 1:N
) %>%
  split(.$i) %>%
  map_df(
    ~data_gen()
  )
toc()

但是,我不确定如何调整此代码以适合我的示例。我首先创建了map_df 函数:

#create map_df function:

map_df <- function() {
    #bin data according to random criteria
    train_data <- train_data %>% mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))

    train_data$cat = as.factor(train_data$cat)

    #new splits
    a_table = train_data %>%
        filter(cat == "a") %>%
        select(a1, b1, c1, cat)

    b_table = train_data %>%
        filter(cat == "b") %>%
        select(a1, b1, c1, cat)

    c_table = train_data %>%
        filter(cat == "c") %>%
        select(a1, b1, c1, cat)

    split_1 =  runif(1,0, 1)
    split_2 =  runif(1, 0, 1)
    split_3 =  runif(1, 0, 1)

    #calculate 60th quantile ("quant") for each bin

    table_a = data.frame(a_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_1)))

    table_b = data.frame(b_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_2)))

    table_c = data.frame(c_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_3)))




    #create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
    table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
    table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
    table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)

    #group all tables

    final_table = rbind(table_a, table_b, table_c)

    #create a table: for each bin, calculate the average of "diff"
    final_table_2 = data.frame(final_table %>%
                                   group_by(cat) %>%
                                   summarize(
                                       mean = mean(diff)
                                   ))

    #add "total mean" to this table
    final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))

    #format this table: add the random criteria to this table for reference
    final_table_2$random_1 = random_1

    final_table_2$random_2 = random_2

    final_table_2$random_3 = random_3

    final_table_2$random_4 = random_4

    final_table_2$split_1 = split_1

    final_table_2$split_2 = split_2

    final_table_2$split_3 = split_3

    final_table_2$iteration_number = i


    results_table <- rbind(results_table, final_table_2)

    final_results = dcast(setDT(results_table), iteration_number + random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
}

但是当我尝试运行通用模板时,它会产生以下错误:

data_gen <- function(){ #here you insert your data generating process
    tibble(
        # create some data for this example
        a1 = rnorm(1000,100,10)
        b1 = rnorm(1000,100,5)
        c1 = sample.int(1000, 1000, replace = TRUE)
        train_data = data.frame(a1,b1,c1)
    )
}

N <- 10000 #number of datasets do be generated


tic('method A')  #not necessary, measures the time of the code between 'tic' and 'toc'
output <- tibble(
    i = 1:N
) %>%
    split(.$i) %>%
    map_df(
        ~data_gen()
    )
toc() 

Error in map_df(., ~data_gen()) : unused arguments (., ~data_gen())

有人知道为什么会产生这个错误吗?

选项 2

我不确定如何在我的示例中使用“doParallel”和“foreach”库。似乎所有带有“doParallel”的示例都要求用户首先定义他们希望计算机使用的“核心”数量:

 library(doParallel)
 cl <- makeCluster(2)
 registerDoParallel(cl)

最后,用户必须指示计算机停止该进程:

stopCluster(cl)

除此之外,我不确定如何使用“doParalell”和“foreach”库来使我的示例受益。

谁能给我看看这个? 谢谢

【问题讨论】:

  • 您的data_gen 函数本身存在一些问题,而map_dfpurrr 中的一个函数。因此,您可能必须创建一个新名称。 data_gen 显示tibble,但中间没有,,则转换为data.frame。我猜这些是错别字
  • 修复后,使用replicatepurrr::rerunN %&gt;% rerun(data_gen())
  • 要返回单个数据,请在末尾使用bind_rows N %&gt;% rerun(data_gen()) %&gt;% bind_rows(.id = 'grp')
  • 你可以查看我的更新

标签: r loops for-loop dplyr parallel-processing


【解决方案1】:

data_gen 函数中有一些潜在的拼写错误。

  • tibble 是在没有, 的情况下使用未分隔的列创建的
 tibble(
        # create some data for this example
        a1 = rnorm(1000,100,10) ####
        b1 = rnorm(1000,100,5)####
      ...
  • 可能比已经存在的函数命名更好 - map_df from purrr

如果我们想多次执行该函数,请使用 replicate from base Rpurrr::rerun

-功能

data_gen <- function(){ #here you insert your data generating process
      tibble::tibble(
         # create some data for this example
         a1 = rnorm(1000,100,10),
         b1 = rnorm(1000,100,5),
         c1 = sample.int(1000, 1000, replace = TRUE))
     
 }

-使用的包

library(purrr)
library(dplyr)
library(doSNOW)
library(parallel)

-顺序运行

N <- 10 
out <-  N %>% 
   rerun(data_gen()) %>%
    bind_rows(.id = 'grp') 

-并行运行

no_of_cores = detectCores()
 cl <- makeSOCKcluster(no_of_cores)
 registerDoSNOW(cl)


 out2 <- foreach(i = seq_len(N), .combine='rbind', 
         .packages = "tibble",        
           .multicombine=TRUE) %dopar% {
       data_gen()
   }
 stopCluster(cl)

-检查行数

nrow(out)
#[1] 10000
nrow(out2)
#[1] 10000

上面的函数只是为了展示如何在parallel中顺序运行简单的函数data_gen。使用 OP 的完整功能,我们可以在外部函数 (map_new_fn) 内部调用 data_gen(),然后并行或顺序调用该函数

map_new_fn <- function() {


    results_table <- data.frame()
     train_data <- data_gen()     
     train_data <- train_data %>%
         mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))

     train_data$cat = as.factor(train_data$cat)
     

     #new splits
     a_table = train_data %>%
         filter(cat == "a") %>%
         select(a1, b1, c1, cat)

     b_table = train_data %>%
         filter(cat == "b") %>%
         select(a1, b1, c1, cat)

     c_table = train_data %>%
         filter(cat == "c") %>%
         select(a1, b1, c1, cat)

     split_1 =  runif(1,0, 1)
     split_2 =  runif(1, 0, 1)
     split_3 =  runif(1, 0, 1)

     #calculate 60th quantile ("quant") for each bin

     table_a = data.frame(a_table%>% group_by(cat) %>%
                              mutate(quant = quantile(c1, prob = split_1)))

     table_b = data.frame(b_table%>% group_by(cat) %>%
                              mutate(quant = quantile(c1, prob = split_2)))

     table_c = data.frame(c_table%>% group_by(cat) %>%
                              mutate(quant = quantile(c1, prob = split_3)))




     #create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
     table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
     table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
     table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)

     #group all tables

     final_table = rbind(table_a, table_b, table_c)

     #create a table: for each bin, calculate the average of "diff"
     final_table_2 = data.frame(final_table %>%
                                    group_by(cat) %>%
                                    summarize(
                                        mean = mean(diff)
                                    ))

     #add "total mean" to this table
     final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))

     #format this table: add the random criteria to this table for reference
     final_table_2$random_1 = random_1

     final_table_2$random_2 = random_2

     final_table_2$random_3 = random_3

     final_table_2$random_4 = random_4

     final_table_2$split_1 = split_1

     final_table_2$split_2 = split_2

     final_table_2$split_3 = split_3

     final_table_2$iteration_number = i


     results_table <- rbind(results_table, final_table_2)

     final_results = dcast(setDT(results_table), iteration_number + random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var = 'mean')
     final_results
 }

-顺序运行

out1_new <- N %>%
       rerun(map_new_fn()) %>%
       bind_rows(.id = 'grp')

-并行运行

no_of_cores = detectCores()
 cl <- makeSOCKcluster(no_of_cores)
 registerDoSNOW(cl)


 out2_new <- foreach(i = seq_len(N), .combine='rbind', 
         .export = c("data_gen", "map_new_fn"),
         .packages = c("tibble", "dplyr", "data.table"),        
           .multicombine=TRUE) %dopar% {
       map_new_fn()
   }
 stopCluster(cl)

-检查输出的行数

nrow(out1_new)
[1] 10
nrow(out2_new)
[1] 10

【讨论】:

  • 感谢您的精彩回答!当我尝试运行“out2_new”时出现一些错误:调用组合函数时出错: 调用组合函数时出错: 调用组合函数时出错:
  • 你知道为什么会这样吗?非常感谢您的帮助!
  • @stats555 我确实测试了代码,但没有发现这个错误
猜你喜欢
  • 2015-03-17
  • 2020-06-07
  • 2017-10-19
  • 2020-07-13
  • 2016-05-09
  • 1970-01-01
  • 1970-01-01
  • 2015-08-31
  • 2018-03-27
相关资源
最近更新 更多