【问题标题】:Reading parquet file in Spark from S3从 S3 读取 Spark 中的镶木地板文件
【发布时间】:2017-01-22 19:01:55
【问题描述】:

我正在从 S3 以 parquet 格式读取数据,然后我将此数据处理为 DataFrame。 问题是如何有效地迭代DataFrame 中的行?我知道collect 方法将数据加载到内存中,所以,虽然我的DataFrame 不大,但我宁愿避免将完整的数据集加载到内存中。我怎样才能优化给定的代码? 另外,我正在使用索引来访问DataFrame 中的列。我可以通过列名访问它们吗(我知道它们)?

DataFrame parquetFile = sqlContext.read().parquet("s3n://"+this.aws_bucket+"/"+this.aws_key_members);
parquetFile.registerTempTable("mydata");
DataFrame eventsRaw = sqlContext.sql("SELECT * FROM mydata");
Row[] rddRows = eventsRaw.collect();
for (int rowIdx = 0; rowIdx < rddRows.length; ++rowIdx)
{
   Map<String, String> props = new HashMap<>();
   props.put("field1", rddRows[rowIdx].get(0).toString());
   props.put("field2", rddRows[rowIdx].get(1).toString());
   // further processing
}

【问题讨论】:

    标签: java apache-spark amazon-s3 spark-dataframe


    【解决方案1】:

    您可以在 spark 中使用Map 函数。 您可以在不收集数据集/数据框的情况下迭代整个数据框:

    Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age 
    BETWEEN 13 AND 19");
    Dataset<String> namesDS = namesDF.map((MapFunction<Row, String>) row -> "Name:" + row.getString(0),Encoders.STRING());
    
    namesDS.show();
    

    如果你正在做的操作很复杂,你可以创建一个地图函数:

     // Map function
     Row doSomething(Row row){
       // get column
         String field = row.getAs(COLUMN)
    // construct a new row and add all the existing/modified columns in the row .  
    return row.
        }
    

    现在这个map函数可以调用到dataframe的map函数中了:

    StructType structType = dataset.schema();
    namesDF.map((MapFunction<Row, Row>)dosomething,
            RowEncoder.apply(structType))
    

    来源:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-12-07
      • 2021-12-28
      • 2021-08-27
      • 2019-01-09
      • 1970-01-01
      • 2020-08-15
      • 1970-01-01
      • 2019-10-27
      相关资源
      最近更新 更多