【问题标题】:How to create hive table from Spark data frame, using its schema?如何使用其架构从 Spark 数据框创建配置单元表?
【发布时间】:2017-07-04 20:11:23
【问题描述】:

我想使用我的 Spark 数据框的架构创建一个配置单元表。我该怎么做?

对于固定列,我可以使用:

val CreateTable_query = "Create Table my table(a string, b string, c double)"
sparksession.sql(CreateTable_query) 

但是我的数据框中有很多列,有没有办法自动生成这样的查询?

【问题讨论】:

标签: scala apache-spark hive


【解决方案1】:

假设您使用的是 Spark 2.1.0 或更高版本,并且 my_DF 是您的数据框,

//get the schema split as string with comma-separated field-datatype pairs
StructType my_schema = my_DF.schema();
String columns = Arrays.stream(my_schema.fields())
                       .map(field -> field.name()+" "+field.dataType().typeName())
                       .collect(Collectors.joining(","));

//drop the table if already created
spark.sql("drop table if exists my_table");
//create the table using the dataframe schema
spark.sql("create table my_table(" + columns + ") 
    row format delimited fields terminated by '|' location '/my/hdfs/location'");
    //write the dataframe data to the hdfs location for the created Hive table
    my_DF.write()
    .format("com.databricks.spark.csv")
    .option("delimiter","|")
    .mode("overwrite")
    .save("/my/hdfs/location");

使用临时表的另一种方法

my_DF.createOrReplaceTempView("my_temp_table");
spark.sql("drop table if exists my_table");
spark.sql("create table my_table as select * from my_temp_table");

【讨论】:

  • 为什么我们需要创建临时表? my_DF.write.saveAsTable(...) 有什么好处吗?
  • stackoverflow.com/questions/30664008/… TL;DR saveastable 不会创建 hive 兼容表。问题特别要求配置单元表......
  • 我会将 field.dataType().typeName() 更改为 field.dataType().sql() 它可以更好地处理复杂/数组类型
  • Scala 翻译val tableColumns = df.schema.filter(_.name != partCol).map(field => field.name + " " + field.dataType.typeName).mkString(",")
【解决方案2】:

根据您的问题,您似乎想使用数据框的架构在配置单元中创建表。但是正如您所说,您在该数据框中有很多列,因此有两个选项

  • 第一个是通过数据框创建直接配置单元表。
  • 第二个是获取此数据框的架构并在 hive 中创建表。

考虑这段代码:

package hive.example

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object checkDFSchema extends App {
  val cc = new SparkConf;
  val sc = new SparkContext(cc)
  val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
  //First option for creating hive table through dataframe 
  val DF = sparkSession.sql("select * from salary")
  DF.createOrReplaceTempView("tempTable")
  sparkSession.sql("Create table yourtable as select * form tempTable")
  //Second option for creating hive table from schema
  val oldDFF = sparkSession.sql("select * from salary")
  //Generate the schema out of dataframe  
  val schema = oldDFF.schema
  //Generate RDD of you data 
  val rowRDD = sc.parallelize(Seq(Row(100, "a", 123)))
  //Creating new DF from data and schema 
  val newDFwithSchema = sparkSession.createDataFrame(rowRDD, schema)
  newDFwithSchema.createOrReplaceTempView("tempTable")
  sparkSession.sql("create table FinalTable AS select * from tempTable")
}

【讨论】:

  • temp 查看?这似乎是在创建一个临时表 - 不在 hive .. 中?请表明该表实际上是在 in hive 中创建的 - 例如在哪个 hive 数据库中
  • 这个“Create table yourtable as select * from tempTable”命令将在 hive 中创建表,并将“yourtable”作为 hive db 中的表名。这里我没有提到任何数据库名,所以它会创建在默认数据库中。
  • 我做了一些额外的研究:看来你的方法应该是正确的。我怀疑的原因是:它不适合我。我将不得不创建一个关于如何混合内存(临时)表和配置单元表的单独问题
【解决方案3】:

这是从 parquet 文件创建 Hive 表的 PySpark 版本。您可能已经使用推断模式生成了 Parquet 文件,现在想要将定义推送到 Hive 元存储。您还可以将定义推送到 AWS Glue 或 AWS Athena 等系统,而不仅仅是 Hive 元存储。这里我使用 spark.sql 来推送/创建永久表。

 # Location where my parquet files are present.
 df = spark.read.parquet("s3://my-location/data/")

    cols = df.dtypes
    buf = []
    buf.append('CREATE EXTERNAL TABLE test123 (')
    keyanddatatypes =  df.dtypes
    sizeof = len(df.dtypes)
    print ("size----------",sizeof)
    count=1;
    for eachvalue in keyanddatatypes:
        print count,sizeof,eachvalue
        if count == sizeof:
            total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
        else:
            total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
        buf.append(total)
        count = count + 1

    buf.append(' )')
    buf.append(' STORED as parquet ')
    buf.append("LOCATION")
    buf.append("'")
    buf.append('s3://my-location/data/')
    buf.append("'")
    buf.append("'")
    ##partition by pt
    tabledef = ''.join(buf)

    print "---------print definition ---------"
    print tabledef
    ## create a table using spark.sql. Assuming you are using spark 2.1+
    spark.sql(tabledef);

【讨论】:

    【解决方案4】:

    另一种方法是使用 StructType.. sql、simpleString、TreeString 等可用的方法...

    您可以从 Dataframe 的架构创建 DDL,可以从您的 DDL 创建 Dataframe 的架构 ..

    这是一个例子 - (Till Spark 2.3)

        // Setup Sample Test Table to create Dataframe from
        spark.sql(""" drop database hive_test cascade""")
        spark.sql(""" create database hive_test""")
        spark.sql("use hive_test")
        spark.sql("""CREATE TABLE hive_test.department(
        department_id int ,
        department_name string
        )    
        """)
        spark.sql("""
        INSERT INTO hive_test.department values ("101","Oncology")    
        """)
    
        spark.sql("SELECT * FROM hive_test.department").show()
    
    /***************************************************************/
    

    现在我可以使用 Dataframe。在实际情况下,您将使用 Dataframe Readers 从文件/数据库创建数据框。让我们使用它的模式来创建 DDL

      // Create DDL from Spark Dataframe Schema using simpleString function
    
     // Regex to remove unwanted characters    
        val sqlrgx = """(struct<)|(>)|(:)""".r
     // Create DDL sql string and remove unwanted characters
    
        val sqlString = sqlrgx.replaceAllIn(spark.table("hive_test.department").schema.simpleString, " ")
    
    // Create Table with sqlString
       spark.sql(s"create table hive_test.department2( $sqlString )")
    

    从 Spark 2.4 开始,您可以在 StructType 上使用 fromDDL 和 toDDL 方法 -

    val fddl = """
          department_id int ,
          department_name string,
          business_unit string
          """
    
    
        // Easily create StructType from DDL String using fromDDL
        val schema3: StructType = org.apache.spark.sql.types.StructType.fromDDL(fddl)
    
    
        // Create DDL String from StructType using toDDL
        val tddl = schema3.toDDL
    
        spark.sql(s"drop table if exists hive_test.department2 purge")
    
       // Create Table using string tddl
        spark.sql(s"""create table hive_test.department2 ( $tddl )""")
    
        // Test by inserting sample rows and selecting
        spark.sql("""
        INSERT INTO hive_test.department2 values ("101","Oncology","MDACC Texas")    
        """)
        spark.table("hive_test.department2").show()
        spark.sql(s"drop table hive_test.department2")
    
    

    【讨论】:

    • 当我尝试.toDDL 时,我得到了反射和 NullPointerExceptions。一般来说,我似乎无法在编译时获得 DDL,并且似乎与 spark 隐式或会话存在某种交互。我想输出可以单独运行的配置单元语句(对于外部分区表),但似乎没有办法做到这一点。
    【解决方案5】:

    从 spark 2.4 开始,您可以使用该功能 dataframe.schema.toDDL 获取列名和类型(即使是嵌套结构)

    【讨论】:

    • 我在 pyspark 中找不到这个 - 只有 Scala 吗?
    猜你喜欢
    • 1970-01-01
    • 2017-10-10
    • 1970-01-01
    • 2017-11-14
    • 2017-01-20
    • 2016-02-11
    • 2015-03-22
    • 2019-08-06
    • 2016-05-20
    相关资源
    最近更新 更多