【问题标题】:Merging large data sets using sparkR使用 sparkR 合并大型数据集
【发布时间】:2016-01-12 02:30:15
【问题描述】:

我想知道 sparkR 是否比“常规 R”更容易合并大型数据集?我有 12 个 csv 文件,大约 500,000 行乘 40 列。这些文件是 2014 年的月度数据。我想为 2014 年制作一个文件。这些文件都有相同的列标签,我想按第一列(年份)合并。但是,某些文件的行数比其他文件多。

当我运行以下代码时:

setwd("C:\\Users\\Anonymous\\Desktop\\Data 2014")

file_list <- list.files()

for (file in file_list){

# if the merged dataset doesn't exist, create it
if (!exists("dataset")){
dataset <- read.table(file, header=TRUE, sep="\t")
}

# if the merged dataset does exist, append to it
if (exists("dataset")){
temp_dataset <-read.table(file, header=TRUE, sep="\t")
dataset<-rbind(dataset, temp_dataset)
rm(temp_dataset)
}

}

R 崩溃了。

当我运行这段代码时:

library(SparkR)
library(magrittr)
# setwd("C:\\Users\\Anonymous\\Desktop\\Data 2014\\Jan2014.csv")
sc <- sparkR.init(master = "local")
sqlContext <- sparkRSQL.init(sc)

Jan2014_file_path <- file.path( 'Jan2014.csv')

system.time(
housing_a_df <- read.df(sqlContext, 
                      "C:\\Users\\Anonymous\\Desktop\\Data       2014\\Jan2014.csv", 
                      header='true',  
                      inferSchema='false')
)

我收到以下错误:

   Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0        in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost):

那么在 sparkR 中合并这些文件的简单方法是什么?

【问题讨论】:

  • 你读过this的答案吗?第一部分,file_list csv 文件中的所有文件吗?
  • 您说您想“按第一列合并”,但在您的示例代码中,您连接了来自不同文件的行。下面的答案(在撰写本文时)是关于 merging=joining,而不是 concatenating。
  • 以下是否有答案,回答您的问题?如果是,请接受答案。这可能会帮助其他开发人员

标签: r apache-spark sparkr


【解决方案1】:

将文件作为数据帧读取后,您可以使用 SparkR 中的 unionAll 将数据帧合并为单个数据帧。然后您可以将其写入单个 csv 文件。

示例代码

    df1 <- read.df(sqlContext, "/home/user/tmp/test1.csv", source = "com.databricks.spark.csv")
    df2 <- read.df(sqlContext, "/home/user/tmp/test2.csv", source = "com.databricks.spark.csv")
    mergedDF <- unionAll(df1, df2)
    write.df(mergedDF, "merged.csv", "com.databricks.spark.csv", "overwrite")

我已经测试并使用了它,但没有针对您的尺寸数据。 但我希望这会对你有所帮助

【讨论】:

    【解决方案2】:

    您应该阅读以下格式的 csv 文件: 参考:https://gist.github.com/shivaram/d0cd4aa5c4381edd6f85

    # Launch SparkR using 
    # ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3
    
    # The SparkSQL context should already be created for you as sqlContext
    sqlContext
    # Java ref type org.apache.spark.sql.SQLContext id 1
    
    # Load the local CSV file using `read.df`. Note that we use the CSV reader Spark package here.
    Jan2014 <- read.df(sqlContext, "C:/Users/Anonymous/Desktop/Data 2014/Jan2014.csv", "com.databricks.spark.csv", header="true")
    
    Feb2014 <- read.df(sqlContext, "C:/Users/Anonymous/Desktop/Data  2014/Feb2014.csv", "com.databricks.spark.csv", header="true")
    
    #For merging / joining by year
    
    #join
       jan_feb_2014 <- join(Jan2014 , Feb2014 , joinExpr = Jan2014$year == Feb2014$year1, joinType = "left_outer")
    # I used "left_outer", so i want all columns of Jan2014 and matching of columns Feb2014, based upon your requirement change the join type. 
    #rename the Feb2014 column name year to year1, as it gets duplicated while joining. Then you can remove the column "jan_feb_2014$year1" after joining by the code, "jan_feb_2014$year1 <- NULL"
    

    这样你就可以逐个加入文件了。

    【讨论】:

    • 加入是否将列添加到来自其他 daraframe 的数据框?由于他想合并两个 csv 文件,我认为 join 可能不适合他
    • 他想按第一列“年份”合并,所以我使用了连接。可能是他希望所有的月份都在列中。@SamuelAlexander
    猜你喜欢
    • 1970-01-01
    • 2018-11-30
    • 1970-01-01
    • 1970-01-01
    • 2021-11-14
    • 2015-01-31
    • 1970-01-01
    • 1970-01-01
    • 2019-03-03
    相关资源
    最近更新 更多