【问题标题】:Sparklyr: how to explode a list column into their own columns in Spark table?Sparklyr:如何将列表列分解为 Spark 表中自己的列?
【发布时间】:2017-09-21 05:36:34
【问题描述】:

我的问题与here 中的问题类似,但我在实现答案时遇到了问题,我无法在该线程中发表评论。

所以,我有一个包含嵌套数据的大 CSV 文件,其中包含由空格分隔的 2 列(比如第一列是 Y,第二列是 X)。 X 列本身也是一个逗号分隔值。

21.66 2.643227,1.2698358,2.6338573,1.8812188,3.8708665,...
35.15 3.422151,-0.59515584,2.4994135,-0.19701914,4.0771823,...
15.22 2.8302398,1.9080592,-0.68780196,3.1878228,4.6600842,...
...

我想使用sparklyr 将此 CSV 读入 2 个不同的 Spark 表。

到目前为止,这是我一直在做的:

  1. 使用spark_read_csv将所有CSV内容导入Spark数据表

    df = spark_read_csv(sc, path = "path", name = "simData", delimiter = " ", header = "false", infer_schema = "false")

    结果是一个名为 simData 的 Spark 表,包含 2 列:C0C1

  2. 使用dplyr选择第一列和第二列,然后分别注册为新表Y和X

    simY <- df %>% select(C0) %>% sdf_register("simY")

    simX <- df %>% select(C1) %>% sdf_register("simX")

  3. 使用ft_regex_tokenizer函数拆分simX中的值,对于写在here中的答案。

    ft_regex_tokenizer(input_DF, input.col = "COL", output.col = "ResultCols", pattern = '\\###')

但是当我尝试使用dplyr head 时:

Source:   query [6 x 1]
Database: spark connection master=yarn-client app=sparklyr local=FALSE

        Result
        <list>
1 <list [789]>
2 <list [789]>
3 <list [789]>
4 <list [789]>
5 <list [789]>
6 <list [789]>

我想把它变成一个新的 Spark 表并将类型转换为 double。有没有办法做到这一点? 我已经考虑将collect 数据转换为 R(使用dplyr),转换为矩阵,然后为每一行执行strsplit,但我认为这不是一个解决方案,因为 CSV 大小可以达到 40GB .

编辑:Spark 版本是 1.6.0

【问题讨论】:

  • 您尝试过 sdf_separate_columns 吗?请参阅其他讨论。

标签: r apache-spark dplyr tidyr sparklyr


【解决方案1】:

假设您的数据如下所示

library(dplyr)
library(sparklyr)

df <- data.frame(text = c("1.0,2.0,3.0", "4.0,5.0,6.0"))
sdf <- copy_to(sc, df, "df", overwrite = TRUE)

你已经创建了spark_connection,你可以关注

n <- 3

# There is no function syntax for array access in Hive
# so we have to build [] expressions
# CAST(... AS double) could be handled in sparklyr / dplyr with as.numeric
exprs <- lapply(
  0:(n - 1), 
  function(i) paste("CAST(bits[", i, "] AS double) AS x", i, sep=""))

sdf %>%
  # Convert to Spark DataFrame
  spark_dataframe() %>% 
  # Use expression with split and explode
  invoke("selectExpr", list("split(text, ',') AS  bits")) %>%
  # Select individual columns
  invoke("selectExpr", exprs) %>%
  # Register table in the metastore ("registerTempTable" in Spark 1.x)
  invoke("createOrReplaceTempView", "exploded_df")

并使用dplyr::tbl 取回sparklyr 对象:

tbl(sc, "exploded_df")
Source:   query [2 x 3]
Database: spark connection master=local[8] app=sparklyr local=TRUE

     x0    x1    x2
  <dbl> <dbl> <dbl>
1     1     2     3
2     4     5     6

在最新版本中你也可以使用sdf_separate_column:

sdf %>% 
  mutate(text=split(text, ",")) %>% 
  sdf_separate_column("text", paste0("x", 0:2))
# Source:   table<sparklyr_tmp_87125f13b89> [?? x 4]
# Database: spark_connection
  text       x0    x1    x2   
  <list>     <chr> <chr> <chr>
1 <list [3]> 1.0   2.0   3.0  
2 <list [3]> 4.0   5.0   6.0  

【讨论】:

  • 哇,这就像魔术一样!实际上我跳过了那个错误部分,直接使用sdf_register,它正在工作。之后我再次检查了这个问题,发现了registerTempTable。啊,是的,我使用的是 Spark 1.6.0。我还稍微修改了您的代码,并使其适用于 Y 部分。非常感谢!
猜你喜欢
  • 2017-09-22
  • 1970-01-01
  • 2017-06-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-28
  • 2018-10-20
  • 2016-12-04
相关资源
最近更新 更多