【问题标题】:Converting JavaRDD to DataFrame in Spark java在 Spark java 中将 JavaRDD 转换为 DataFrame
【发布时间】:2017-05-09 05:33:24
【问题描述】:

我正在尝试处理日志文件。首先,我读取日志文件并根据我的要求拆分这些文件,并将每一列保存到单独的 JavaRDD 中。现在我需要将这些 JavaRDD 转换为 DataFrames 以供将来操作。这是我到目前为止尝试的代码:

         SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
         JavaSparkContext sc = new JavaSparkContext(conf);
         JavaRDD<String> diskfile = sc.textFile("/Users/karuturi/Downloads/log.txt");
         JavaRDD<String> urlrdd=diskfile.flatMap(line -> Arrays.asList(line.split("\t")[0]));
         System.out.println(urlrdd.take(1));
         SQLContext sql = new SQLContext(sc);

这就是我尝试将 JavaRDD 转换为 DataFrame 的方式:

DataFrame fileDF = sqlContext.createDataFrame(urlRDD, Model.class);

但上面的行不起作用。我对 Model.class 感到困惑。

谁能推荐我。

谢谢。

【问题讨论】:

    标签: java apache-spark hadoop apache-spark-sql


    【解决方案1】:

    进口:

    import java.io.Serializable;
    
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    

    为 URL 创建一个 POJO 类。我建议您为包含 url、日期、时间、方法、目标等作为成员的 Log 行编写

    public static class Url implements Serializable {
      private String value;
    
      public String getValue() {
        return value;
      }
    
      public void setValue(String value) {
        this.value = value;
      }
    }  
    

    从文本文件中创建一个 Url 对象的 RDD

    JavaRDD<Url> urlsRDD = spark.read()
      .textFile("/Users/karuturi/Downloads/log.txt")
      .javaRDD()
      .map(new Function<String, Url>() {
        @Override
        public Url call(String line) throws Exception {
          String[] parts = line.split("\\t");
          Url url = new Url();
          url.setValue(parts[0].replaceAll("[", ""));
          return url;
        }
      });
    

    从 RDD 创建 DataFrame

    Dataset<Row> urlsDF = spark.createDataFrame(urlsRDD, Url.class);
    

    RDD to DataFrame - Spark 2.0
    RDD to DataFrame - Spark 1.6

    【讨论】:

    • 如果我想用SparseVector 转换一个JavaRDD 怎么办?
    【解决方案2】:

    您可以执行以下操作(我正在从 scala 即时转换,请原谅任何拼写错误):

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    JavaRDD<Row> rowRDD = urlrdd.map(new Function<String, Row>() {
        @Override
        public Row call(String record) throws Exception {
            return RowFactory.create(record());
        }
    }
    // now you wish to create the target schema. This is basically a list of
    // fields (each field would be a column) which you are adding to a StructType
    List<StructField> fields = new ArrayList<>();
    StructField field = DataTypes.createStructField("url", DataTypes.StringType, true);
    fields.add(field);
    StructType schema = DataTypes.createStructType(fields);
    
    // now you can create the dataframe:
    DataFrame df= sqlContext.createDataFrame(rowRDD, schema);    
    

    一些补充说明:

    • 为什么只取第一个元素时要进行平面映射?你可以简单地完成:

      JavaRDD&lt;String&gt; urlrdd=diskfile.flatMap(line -&gt; line.split("\t")[0]);

    • 我假设在现实生活中您希望从 url 中删除“[”(您可以在地图中轻松地做到这一点)。

    • 如果您要迁移到 spark 2.0 或更高版本,那么您应该使用 spark session (spark) 而不是 sqlContext。

    • 您可以创建包含所有列的单个数据框。您可以通过将所有字段添加到架构中来做到这一点(即,而不是只对字段进行一次添加,而是添加所有字段)。不要使用 urlrdd,而是使用磁盘文件并在“公共行调用”创建中进行拆分。这将是这样的:

      JavaRDD<Row> rowRDD = diskfile.map(new Function<String, Row>() { @override public Row call(String record) throws Exception { String[] recs = record.split("\t") return RowFactory.create(recs[0], recs[1], ...); } });

    • 您可以直接创建它:只需使用

      sqlContext.read.option("sep","\t").csv.load(filename,schema)

    【讨论】:

      【解决方案3】:

      只需根据 7 列表对数据进行平面映射,然后使用下面的代码 sn-p

      String[] columns = new String[7] {"clumn1","column2","column3","column4","column5","column6","column7"};
      List<String> tableColumns = Arrays.asList(columns);
      
      StrucType schema = createSchema(tableColumns);
      
          public StructType createSchema(List<String> tableColumns){
      
              List<StructField> fields  = new ArrayList<StructField>();
              for(String column : tableColumns){         
      
                      fields.add(DataTypes.createStructField(column, DataTypes.StringType, true));            
      
              }
              return DataTypes.createStructType(fields);
          }
      
      sqlContext.createDataFrame(urlRDD, schema);
      

      【讨论】:

        【解决方案4】:

        可以直接使用sqlContext直接读取文件

        使用sqlContext的read方法

        欲了解更多信息,您可以点击此链接

        https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#creating-dataframes

        或者你可以导入

        import sqlContext.implicits.*;
        

        然后在rdd上使用toDF()方法转换成dataframe。

        【讨论】:

        • import sqlContext.implicits._ 命令在 spark java 中不支持
        • 是的,抱歉刚刚看到这个。最好的选择是使用 sqlContext 来读取文件。因为将 rdd 转换为 dataframe 使用反射,所以为了减少额外的计算使用 sqlContext 来读取文件。
        猜你喜欢
        • 2021-04-12
        • 2017-03-25
        • 1970-01-01
        • 1970-01-01
        • 2016-07-24
        • 2017-03-17
        • 1970-01-01
        • 2020-06-01
        • 2022-01-08
        相关资源
        最近更新 更多