【问题标题】:R - read csv by chunks process them in parallelR - 按块读取 csv 并行处理它们
【发布时间】:2017-06-07 09:49:02
【问题描述】:

如何按块读取 CSV 文件(它的大小太大而无法一次读取),并使用 parallel 包处理所有块?假设我想按块计算一列的平均值。

如果没有parallel,我会使用这样的东西:

library(readr)

f <- function(df_chunk, pos) mean(df_chunk$'mpg')

res <- read_csv_chunked(readr_example("mtcars.csv"), DataFrameCallback$new(f), chunk_size=10)

结果是:

> res
      [,1]
[1,] 20.37
[2,] 19.89
[3,] 20.39
[4,] 18.20

所以我希望这 4 个平均值由子进程计算。实现这一点的一些代码可能是这样的:

library(foreach)
library(doParallel)

registerDoParallel(6)

calc_avg <- function (iterable_df_chunks) {
  foreach(df_chunk = iterable_df_chunks, .combine = c) %dopar%
    mean(df_chunk$'mpg')
}

calc_avg(< some code with read_csv_chunked() >)

stopImplicitCluster()

谢谢!

【问题讨论】:

    标签: r parallel-processing chunks


    【解决方案1】:

    好的,我找到了一个可行的解决方案。函数load_CPU 只进行一些 CPU 密集型计算来检查子进程是否真的完成了这项工作:

    load_CPU <- function(n){
      i=3
      v=c(2)
      while (length(v)<=n-1){
    
        if (all((i%%v[v<ceiling(sqrt(i))])!=0)){ 
          v=c(v,i)
        }
        i=i+2;
      }
      return(v)
    }
    
    calc_avg <- function (path) foreach(y = read_csv_chunked(path, ListCallback$new(function (x, pos) x),
        chunk_size = 10), .combine = rbind, .export=c('load_CPU')) %dopar% {
            load_CPU(10000)
            mean(y$'mpg')
        }
    
    calc_avg(readr_example("mtcars.csv"))
    

    结果是:

              [,1]
    result.1 20.37
    result.2 19.89
    result.3 20.39
    result.4 18.20
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-09-05
      • 2012-04-24
      • 2023-04-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多