【问题标题】:Run breakpoint (lm) detection in parallel in R在 R 中并行运行断点 (lm) 检测
【发布时间】:2019-03-12 11:58:19
【问题描述】:

我在 R 中进行了大约 80000 个时间序列断点检测计算。我有所有这些极其不同的时间序列,我无法应用 ARIMA 模型,所以我正在计算每个时间序列的线性模型,然后提取断点并使用拟合结果的回归来计算来自最后一个断点的趋势。

在上面的示例中,算法将检测三个断点(一个倾斜,一个相当平坦,一个下降)。它非常适合我的需求,但每周按顺序运行一次 80k 断点计算开销太大,因此我试图通过在 R 中使用并行处理来实现这一点。

在这个例子中(在下面找到数据链接)我正在按顺序计算断点,所有 88k 大约需要 24 小时。

df.subset <- read.csv("dfsubset.csv)"
start <- Sys.time()

All.Breakpoints <- df.subset %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))

Sys.time() - start

在这段代码 sn-p 中,我对 10 个时间序列(在我的 Mac 上)运行检测,耗时 47 秒。我猜想并行化应该将这个基准测试时间减少到大约 1/4 pf 时间。

下面我列出了我尝试并行计算的三种方法,但我无法让嵌套应用在并行设置中工作。

带并行包

clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(tidyr); library(magrittr)})

myfunction <- function(df.subset) {
All.Breakpoints <- df.subset %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
return(All.Breakpoints)
}

clusterExport(clus, "myfunction")

do.call(bind_rows, parApply(clus, df.subset, 1,{function(r) { 
myfunction(r[1]) }}))

使用 multidplyr 包:

library(multidplyr)
cluster <- create_cluster(4)
set_default_cluster(cluster)

four <- function(x) {
All.Breakpoints <- x %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
return(All.Breakpoints)
}

cluster_assign_value(cluster, 'four', four)
save <- df.subset %>% partition(CONC_ID) %>% map(four(.))

与并行包但其他分组

library(parallel)
cl <- detectCores()

group <- df.subset %>% group_by(CONC_ID) %>% group_indices
df.subset <- bind_cols(tibble(group), df.subset)

cluster <- create_cluster(cores = cl)

by_group <- df.subset %>%
partition(group, cluster = cluster)

by_group %>%
# Assign libraries
cluster_library("tidyr") %>%
cluster_library("dplyr") %>%
cluster_library("strucchange") %>%
cluster_library("purrr") %>%
# Assign values (use this to load functions or data to each core)
cluster_assign_value("df.subset", df.subset) 

cluster_eval(by_group, search())[[1]] # results for first cluster shown 
only
cluster_get(by_group, "df.subset")[[1]]

start <- proc.time() # Start clock
sp_500_processed_in_parallel <- by_group %>% # Use by_group party_df
group_by(CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .))) 
%>%
collect() %>% # Special collect() function to recombine partitions
as_tibble()   # Convert to tibble
time_elapsed_parallel <- proc.time() - start # End clock
time_elapsed_parallel

文件链接:

http://www.filedropper.com/dfsubset

感谢您的想法和反馈!

【问题讨论】:

    标签: r dplyr breakpoints doparallel multidplyr


    【解决方案1】:

    提出问题并描述问题将在大多数情况下为您自己解决...我发现 mutate 在 R 中并行运行在任何地方都不起作用(老实说,Stackoverflow)。

    因此,我改用 do 并通过 multidplyr 分配负载,从 1 核到 4 核再到 25 核时,计算时间减少了约 50%从 1 核变为 8 核时占总时间的百分比。

    代码如下。

    ## parallel
    cl <- detectCores()
    cl
    
    df.cluster <- df.subset
    
    cluster <- create_cluster(cores = cl)
    cluster
    
    by_group <- df.cluster %>%
    partition(CONC_ID, cluster = cluster)
    by_group
    
    by_group %>%
    
    # Assign libraries
    cluster_library("strucchange")
    cluster_eval(by_group, search())[[1]] # results for first cluster shown only
    
    start <- proc.time() # Start clock
    
    cluster.processed <- by_group %>%
                         do(model = breakpoints(ACT_QTY_new ~ Index, data = .)) %>%
                         collect()
    
    time_elapsed_parallel <- proc.time() - start # End clock
    time_elapsed_parallel
    
    rm(by_grou)
    gc()
    
    Predictions <- cluster.processed %>%
    mutate(SegmentedForecast = map(model, fitted))
    df.fitted.vector <- as.data.frame(rowwise(Predictions[,3])) . 
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-08-22
      • 1970-01-01
      • 2016-06-27
      • 2021-11-13
      • 2016-06-25
      • 2021-08-25
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多