【问题标题】:SparkR: Generate Density Data From Numeric ColumnsSparkR:从数值列生成密度数据
【发布时间】:2021-05-29 03:42:32
【问题描述】:

我想知道如何生成一个数据对象,就像在 spark 数据帧的数字列上调用 stats::density(df$variable) 时得到的那样?

我正在调查SparkR::spark.lapply,但我认为我遗漏了一些东西。我在下面创建了一个小例子。如果有人知道如何并愿意帮助我,我将非常感激。

最好, NF

例子:

df<- iris
gen_density_data<- function(df){
  col_types<- sapply(df, class)
  good_cols<- which(col_types %in% c("numeric", "integer"))
  tres<- lapply(good_cols, function(x){
    expr<- paste0("stats::density(df$", colnames(df)[x], ")")
    eval(parse(text=expr))
  })
  return(tres)
}

res<- gen_density_data(df)

# And for Spark:
sdf<- SparkR::createDataFrame(iris)
gen_spark_density_data<- function(sdf){
  tmp_types<- SparkR::coltypes(sdf)
  good_cols_idx<- which(tmp_types %in% setdiff(tmp_types, c("character", "POSIXct", "POSIXlt", "logical")))
  if(length(good_cols_idx)>=1){
    tres<- SparkR::spark.lapply(good_cols_idx, function(x){
      eval(parse(text=paste0("stats::density(sdf$", colnames(sdf)[x], ")")))
    })
    return(tres)
  }
}

tst<- gen_spark_density_data(sdf=sdf)    # This is where it throws errors. 

【问题讨论】:

    标签: r apache-spark sparkr kernel-density


    【解决方案1】:

    我想出了一个运行良好的解决方案。我使用highcharter 进行绘图。我认为我可以进一步改进管理数据分区的方式。目前,这可能不是对于具有在最小值和最大值之间有很大差异的列的大型数据集的最具可扩展性的解决方案。一些条件检查可能是有序的,但为了得到一个例子,这就是我所做的。注意:我采用了来自https://rpubs.com/mcocam12/KDF_byHand 的示例。非常感谢 Marc 提供的示例。

    数据:

    df<- do.call("rbind", replicate(10, iris, simplify = FALSE))
    sdf<- SparkR::createDataFrame(df)
    sdf<- SparkR::repartition(sdf, nrow(sdf))
    

    功能:

    gen_sdf_kernel_density_points<- function(sdf=sdf,num_values, h=1){
      x_col<- SparkR::colnames(sdf)[1]
      min_max_sdf<- eval(parse(text=paste0("SparkR::agg(sdf, min=min(sdf$", x_col, "), max=max(sdf$", x_col,")) %>% SparkR::collect()")))
      Range = seq(min_max_sdf$min-5, min_max_sdf$max+5, 0.01)
      Range<- data.frame(Range)
      RangeSDF<- SparkR::createDataFrame(Range)
    
      # this is where I think I could be better with partitions, ideas welcomed
      #RangeSDF<- SparkR::repartition(RangeSDF, nrow(RangeSDF))
      # if(nrow(Range)>1000){
      #   RangeSDF<- SparkR::repartition(RangeSDF, 200L)
      # } else if(nrow(Range) > 64){
      #   RangeSDF<- SparkR::repartition(RangeSDF, 64L)
      # }
      
      tst<- SparkR::crossJoin(sdf, RangeSDF)
      tst$density<- eval(parse(text=paste0("exp(-(tst$Range-tst$", x_col,")^2/(2*h^2))/(h*sqrt(2*pi))")))
      
      ## Now group by range and get the sum of the density, normalize by the number of values
      gb_df<- SparkR::groupBy(tst, tst$Range)
      densities2<- SparkR::agg(gb_df, bell_sum=sum(tst$density))
      densities2<- SparkR::withColumn(densities2, "kernel_density", densities2$bell_sum / num_values)
      densities2<- SparkR::arrange(densities2, asc(densities2$Range))
      return(densities2)
    }
    
    
    gen_den_plots_from_spark_res<- function(res){
      big_out<- lapply(seq_along(res), function(x){
        var_name<- names(res)[x]
        rdf<- res[[x]]
        tmp<- data.frame(cbind(x = rdf$Range, y = rdf$kernel_density))
        x<- highcharter::list_parse(tmp)
        
        
        hc<- highcharter::highchart() %>%
          hc_series(
            list(
              name="Density Estimate",
              data =  x,
              type = "areaspline",
              marker = list(enabled = FALSE),
              color =  list(
                linearGradient = list(x1 = 0, y1 = 1, x2 = 0, y2 = 0),
                stops = list(
                  list(0, "transparent"),
                  list(0.33, "#0000FF1A"),
                  list(0.66, "#0000FF33"),
                  list(1, "#ccc")
                )
              ),
              fillColor = list(
                linearGradient = list(x1 = 0, y1 = 1, x2 = 0, y2 = 0),
                stops = list(
                  list(0, "transparent"),
                  list(0.1, "#0000FF1A"),
                  list(0.5, "#0000FF33"),
                  list(1, "#0000FF80")
                )
              )
            )
          )
        hc<- hc  %>%
          highcharter::hc_title(text=paste0("Density Plot For: ", snakecase::to_title_case(var_name)))# %>%    hc_add_series(data =tmp, hcaes(x= tmp$x, y = tmp$y),name="Bars", type="column" )
        
        return(hc)
      })
      return(big_out)
    }
    
    make_hc_grid<- function(tres_out, ncol=2){
      hc<- tres_out %>%
        highcharter::hw_grid(rowheight = 450, ncol = ncol)  %>%htmltools::browsable()
      hc
    }
    

    用法:

    tmp_types<- SparkR::coltypes(sdf)
    good_cols_idx<- which(tmp_types %in% setdiff(tmp_types, c("character", "POSIXct", "POSIXlt", "logical")))
    nrows_sdf<- SparkR::count(sdf)
    
    if(length(good_cols_idx)>=1){ 
      out<- lapply(seq_along(good_cols_idx), function(z){
        # Need to select a single column for the sdf, otherwise the cross join becomes too big
        tmpz<- SparkR::select(sdf, SparkR::colnames(sdf)[good_cols_idx[z]])
        out<- gen_sdf_kernel_density_points(sdf = tmpz, num_values = nrows_sdf)
        out<- SparkR::collect(out)
        return(out)
      }) %>% stats::setNames(SparkR::colnames(sdf)[good_cols_idx])
    }
    

    绘图:

    tres<- gen_den_plots_from_spark_res(res=out)
    all_plots<- make_hc_grid(tres_out = tres)
    
    # View Result
    all_plots
    

    预期结果:

    这一切都可能会得到改进......如果你有想法,我很想听听。

    最好, NF

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-10-27
      • 1970-01-01
      • 2017-05-30
      • 1970-01-01
      • 2010-10-03
      相关资源
      最近更新 更多