我想出了一个运行良好的解决方案。我使用highcharter 进行绘图。我认为我可以进一步改进管理数据分区的方式。目前,这可能不是对于具有在最小值和最大值之间有很大差异的列的大型数据集的最具可扩展性的解决方案。一些条件检查可能是有序的,但为了得到一个例子,这就是我所做的。注意:我采用了来自https://rpubs.com/mcocam12/KDF_byHand 的示例。非常感谢 Marc 提供的示例。
数据:
df<- do.call("rbind", replicate(10, iris, simplify = FALSE))
sdf<- SparkR::createDataFrame(df)
sdf<- SparkR::repartition(sdf, nrow(sdf))
功能:
gen_sdf_kernel_density_points<- function(sdf=sdf,num_values, h=1){
x_col<- SparkR::colnames(sdf)[1]
min_max_sdf<- eval(parse(text=paste0("SparkR::agg(sdf, min=min(sdf$", x_col, "), max=max(sdf$", x_col,")) %>% SparkR::collect()")))
Range = seq(min_max_sdf$min-5, min_max_sdf$max+5, 0.01)
Range<- data.frame(Range)
RangeSDF<- SparkR::createDataFrame(Range)
# this is where I think I could be better with partitions, ideas welcomed
#RangeSDF<- SparkR::repartition(RangeSDF, nrow(RangeSDF))
# if(nrow(Range)>1000){
# RangeSDF<- SparkR::repartition(RangeSDF, 200L)
# } else if(nrow(Range) > 64){
# RangeSDF<- SparkR::repartition(RangeSDF, 64L)
# }
tst<- SparkR::crossJoin(sdf, RangeSDF)
tst$density<- eval(parse(text=paste0("exp(-(tst$Range-tst$", x_col,")^2/(2*h^2))/(h*sqrt(2*pi))")))
## Now group by range and get the sum of the density, normalize by the number of values
gb_df<- SparkR::groupBy(tst, tst$Range)
densities2<- SparkR::agg(gb_df, bell_sum=sum(tst$density))
densities2<- SparkR::withColumn(densities2, "kernel_density", densities2$bell_sum / num_values)
densities2<- SparkR::arrange(densities2, asc(densities2$Range))
return(densities2)
}
gen_den_plots_from_spark_res<- function(res){
big_out<- lapply(seq_along(res), function(x){
var_name<- names(res)[x]
rdf<- res[[x]]
tmp<- data.frame(cbind(x = rdf$Range, y = rdf$kernel_density))
x<- highcharter::list_parse(tmp)
hc<- highcharter::highchart() %>%
hc_series(
list(
name="Density Estimate",
data = x,
type = "areaspline",
marker = list(enabled = FALSE),
color = list(
linearGradient = list(x1 = 0, y1 = 1, x2 = 0, y2 = 0),
stops = list(
list(0, "transparent"),
list(0.33, "#0000FF1A"),
list(0.66, "#0000FF33"),
list(1, "#ccc")
)
),
fillColor = list(
linearGradient = list(x1 = 0, y1 = 1, x2 = 0, y2 = 0),
stops = list(
list(0, "transparent"),
list(0.1, "#0000FF1A"),
list(0.5, "#0000FF33"),
list(1, "#0000FF80")
)
)
)
)
hc<- hc %>%
highcharter::hc_title(text=paste0("Density Plot For: ", snakecase::to_title_case(var_name)))# %>% hc_add_series(data =tmp, hcaes(x= tmp$x, y = tmp$y),name="Bars", type="column" )
return(hc)
})
return(big_out)
}
make_hc_grid<- function(tres_out, ncol=2){
hc<- tres_out %>%
highcharter::hw_grid(rowheight = 450, ncol = ncol) %>%htmltools::browsable()
hc
}
用法:
tmp_types<- SparkR::coltypes(sdf)
good_cols_idx<- which(tmp_types %in% setdiff(tmp_types, c("character", "POSIXct", "POSIXlt", "logical")))
nrows_sdf<- SparkR::count(sdf)
if(length(good_cols_idx)>=1){
out<- lapply(seq_along(good_cols_idx), function(z){
# Need to select a single column for the sdf, otherwise the cross join becomes too big
tmpz<- SparkR::select(sdf, SparkR::colnames(sdf)[good_cols_idx[z]])
out<- gen_sdf_kernel_density_points(sdf = tmpz, num_values = nrows_sdf)
out<- SparkR::collect(out)
return(out)
}) %>% stats::setNames(SparkR::colnames(sdf)[good_cols_idx])
}
绘图:
tres<- gen_den_plots_from_spark_res(res=out)
all_plots<- make_hc_grid(tres_out = tres)
# View Result
all_plots
预期结果:
这一切都可能会得到改进......如果你有想法,我很想听听。
最好,
NF