【问题标题】:Spark Dataframe - Schema file definitionSpark Dataframe - 架构文件定义
【发布时间】:2017-09-09 18:14:06
【问题描述】:

我有一个简单的 Spark 应用程序,它旨在读取分隔的文本文件并将它们保存为 parquet 格式。

要求是处理一个平面数据文件(没有标题),该文件将伴随一个架构定义。最终结果是一个可执行的 jar,它作为命令行参数传递。

到目前为止,我查看的示例要么是从标题行推断架构,要么是在代码本身中定义架构。这是如何实现的?

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


public class SparkCSVApplication {

public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
    // create Spark Context
    SparkContext context = new SparkContext(conf);
    // create spark Session
    SparkSession sparkSession = new SparkSession(context);

    Dataset<Row> df = sparkSession
            .read()
            .format("com.databricks.spark.csv")
            .option("header", true)
            .option("inferSchema", true)
            .load("/Users/Chris/Desktop/Meter_Geocode_Data_150215_114551.csv"); //TODO: CMD line arg
                        //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

    System.out.println("========== Print Schema ============");
    df.printSchema();
    System.out.println("========== Print Data ==============");
    df.show();
    System.out.println("========== Generate parquet file ==============");
    df.write().parquet("/Users/Chris/Desktop/meter_geocode.parquet");

}

}

【问题讨论】:

    标签: java apache-spark schema parquet


    【解决方案1】:

    inferSchema 不会从标题行中找出数据类型。它会从数据本身中找出数据类型,这是文档中的文本 -

    inferSchema – 从数据中自动推断输入模式。它 需要对数据进行一次额外的传递。如果设置了 None,它使用 默认值,false。

    编辑:

    要将其他文件中定义的架构与现有数据框相关联,您可以通过几种方式以编程方式进行。

    假设您在main.csv 中有数据,并且在名为header.csv 的第二个文件中的所有标题只包含逗号分隔的列名列表。执行以下操作 -

    # read main data file, 
    df = spark.read.csv("main.csv",header=False,inferSchema=True)
    
    # read the file where headers are stored as string
    hrdd = sc.textFile("header.csv")    
    # make a list
    newColumns = hrdd.collect()[0].split(",")
    
    # Method # 1 : renaming all columns one by one 
    
    # first get old column names
    oldColumns = df.columns
    
    if len(oldColumns) == len(newColumns):
        for i,newCol in enumerate(newColumns):
            df = df.withColumnRenamed(oldColumns[i],newCol)
    
    
    or
    # Method # 2 : just create a new dataframe by passing schema which was derived from reading 2nd file.
    
    df = spark.createDataFrame(df.rdd,schema=newColumns)
    

    免责声明:这是用 pyspark 编写的,我相信它在 java 中也足够简单。

    【讨论】:

    • 是的,明白这一点。但是如何通过模式定义文件将数据列关联到标题?这样任何数据集都可以被处理?
    • @chris-finlayson 添加了新答案
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-10
    • 1970-01-01
    • 2021-07-22
    • 2018-05-12
    相关资源
    最近更新 更多