【问题标题】:R SparkR - equivalent to melt functionR SparkR - 相当于熔化功能
【发布时间】:2023-10-04 19:03:01
【问题描述】:

SparkR 库中是否有类似melt 的函数?

将 1 行 50 列的数据转换为 50 行 3 列的数据?

【问题讨论】:

  • @sramalingam24 这根本不是真的。一般来说,设计用于data.frame 的实用程序与SparkDatafarme 不兼容。
  • @sramalingam24 好吧,如果你这样做,数据将不再被分发,并且可能会使你的程序崩溃(如果首先使用 SparkR 有意义的话)。一般来说,没有一种高效且可扩展的方式来将任意 R 函数与 SparkR 数据结构结合使用。

标签: r apache-spark reshape2 sparkr


【解决方案1】:

在 SparkR 中没有提供类似功能的内置函数。您可以使用explode 构建自己的

library(magrittr)

df <- createDataFrame(data.frame(
  A = c('a', 'b', 'c'),
  B = c(1, 3, 5),
  C = c(2, 4, 6)
))

melt <- function(df, id.vars, measure.vars, 
                 variable.name = "key", value.name = "value") {

   measure.vars.exploded <- purrr::map(
       measure.vars, function(c) list(lit(c), column(c))) %>% 
     purrr::flatten() %>% 
     (function(x) do.call(create_map, x)) %>% 
     explode()
   id.vars <- id.vars %>% purrr::map(column)

   do.call(select, c(df, id.vars, measure.vars.exploded)) %>%
     withColumnRenamed("key", variable.name) %>%
     withColumnRenamed("value", value.name)
}

melt(df, c("A"), c("B", "C")) %>% head()
  A key value                                                                   
1 a   B     1
2 a   C     2
3 b   B     3
4 b   C     4
5 c   B     5
6 c   C     6

或将 SQL 与 Hive 的 stack UDF 一起使用:

stack <- function(df, id.vars, measure.vars, 
                  variable.name = "key", value.name = "value") { 
  measure.vars.exploded <- glue::glue('"{measure.vars}", `{measure.vars}`') %>%  
    glue::glue_collapse(" , ") %>%
    (function(x) glue::glue(
      "stack({length(measure.vars)}, {x}) as ({variable.name}, {value.name})"
    )) %>%
    as.character()
    do.call(selectExpr, c(df, id.vars, measure.vars.exploded))
}

stack(df, c("A"), c("B", "C")) %>% head()
  A key value
1 a   B     1
2 a   C     2
3 b   B     3
4 b   C     4
5 c   B     5
6 c   C     6

相关问题:

【讨论】:

  • 非常优雅的解决方案。谢谢!我认为可以通过将id.vars &lt;- id.vars %&gt;% purrr::map(column) do.call(select, c(df, id.vars, measure.vars.exploded)) 替换为df %&gt;% select(c(id.vars, measure.vars.exploded)) 来进一步简化它,至少它适用于我的RSpark (2.4) 版本