【问题标题】:How to save DataFrame directly to Hive?如何将 DataFrame 直接保存到 Hive?
【发布时间】:2015-08-20 06:07:29
【问题描述】:

是否可以将 spark 中的DataFrame 直接保存到 Hive 中?

我尝试将DataFrame 转换为Rdd,然后另存为文本文件,然后加载到配置单元中。但是我想知道我是否可以直接将dataframe保存到hive

【问题讨论】:

    标签: scala apache-spark hive apache-spark-sql


    【解决方案1】:

    您可以使用 sqlContext 创建内存中的临时表并将它们存储在 hive 表中。

    假设您的数据框是 myDf。您可以使用创建一个临时表,

    myDf.createOrReplaceTempView("mytempTable") 
    

    然后您可以使用简单的 hive 语句来创建表并转储临时表中的数据。

    sqlContext.sql("create table mytable as select * from mytempTable");
    

    【讨论】:

    • 这解决了我在 spark 2.0 中使用 write.saveAsTable 时遇到的镶木地板读取错误
    • 是的。但是,我们可以在创建临时表之前对数据框使用分区。 @chhantyal
    • 您是如何将temporary 表与hive 表混合和匹配的?在执行show tables 时,它仅包含我的spark 2.3.0 安装的hive
    • 此临时表将保存到您的配置单元上下文中,并且不属于任何配置单元表。
    • 嗨@VinayKumar 为什么你说“如果你使用 saveAsTable(它更像是持久化你的数据帧),你必须确保你有足够的内存分配给你的 spark 应用程序”。你能解释一下吗?
    【解决方案2】:

    使用DataFrameWriter.saveAsTable。 (df.write.saveAsTable(...)) 见Spark SQL and DataFrame Guide

    【讨论】:

    • saveAsTable 不创建 Hive 兼容表。我找到的最佳解决方案是 Vinay Kumar。
    • @Jacek:我自己添加了这个注释,因为我认为我的回答是错误的。我会删除它,除非它被接受。你认为注释有误吗?
    • 是的。注释是错误的,这就是我删除它的原因。 “如果我错了,请纠正我”在这里适用:)
    • 这个df.write().saveAsTable(tableName)是否也会将流数据写入表中?
    • 不,您无法使用 saveAsTable 保存流数据,它甚至不在 api 中
    【解决方案3】:

    我没有在 Spark 2.0 文档中看到 df.write.saveAsTable(...) 已弃用。它在 Amazon EMR 上对我们有用。我们完全能够将数据从 S3 读取到数据帧中,对其进行处理,从结果中创建一个表格并使用 MicroStrategy 读取它。 不过,Vinays 的回答也奏效了。

    【讨论】:

    • 由于长度和内容的原因,有人将此答案标记为低质量。老实说,作为评论可能会更好。我想它已经存在两年了,有些人发现它很有帮助,所以保持原样可能会更好?
    • 我同意,评论会是更好的选择。经验教训:-)
    【解决方案4】:

    你需要拥有/创建一个 HiveContext

    import org.apache.spark.sql.hive.HiveContext;
    
    HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
    

    然后直接保存dataframe或者选择列存储为hive表

    df 是数据帧

    df.write().mode("overwrite").saveAsTable("schemaName.tableName");
    

    df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
    

    df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
    

    SaveModes 是 Append/Ignore/Overwrite/ErrorIfExists

    我在这里添加了 Spark 文档中 HiveContext 的定义,

    除了基本 SQLContext 之外,您还可以创建 HiveContext,它提供基本 SQLContext 提供的功能的超集。其他功能包括使用更完整的 HiveQL 解析器编写查询的能力、对 Hive UDF 的访问以及从 Hive 表读取数据的能力。要使用 HiveContext,您不需要现有的 Hive 设置,并且 SQLContext 可用的所有数据源仍然可用。 HiveContext 仅单独打包,以避免在默认 Spark 构建中包含 Hive 的所有依赖项。


    在 Spark 版本 1.6.2 上,使用“dbName.tableName”会出现此错误:

    org.apache.spark.sql.AnalysisException:临时表不允许指定数据库名称或其他限定符。如果表名中有点(.),请用反引号()引用表名。`

    【讨论】:

    • 是第二个命令:'df.select(df.col("col1"),df.col("col2"), df.col("col3")).write()。 mode("覆盖").saveAsTable("schemaName.tableName");'要求您打算覆盖的选定列已存在于表中?所以你有现有的表,你只用你的 df 在 spark 中的新数据覆盖现有的列 1,2,3?这解释对吗?
    • df.write().mode...需要改成df.write.mode...
    【解决方案5】:

    抱歉,帖子写得太晚了,但我没有看到任何可接受的答案。

    df.write().saveAsTable 将抛出 AnalysisException 并且与 HIVE 表不兼容。

    将 DF 存储为 df.write().format("hive") 应该可以解决问题!

    但是,如果这不起作用,那么按照之前的 cmets 和答案,这是我认为最好的解决方案(尽管接受建议)。

    最好的办法是显式创建 HIVE 表(包括 PARTITIONED 表),

    def createHiveTable: Unit ={
    spark.sql("CREATE TABLE $hive_table_name($fields) " +
      "PARTITIONED BY ($partition_column String) STORED AS $StorageType")
    }
    

    将DF保存为临时表,

    df.createOrReplaceTempView("$tempTableName")

    并插入 PARTITIONED HIVE 表:

    spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
    spark.sql("select * from default.$hive_table_name").show(1000,false)
    

    DF 中的 LAST COLUMN 将是 PARTITION COLUMN,因此请相应地创建 HIVE 表!

    如果有效请评论!与否。


    --更新--

    df.write()
      .partitionBy("$partition_column")
      .format("hive")
      .mode(SaveMode.append)
      .saveAsTable($new_table_name_to_be_created_in_hive)  //Table should not exist OR should be a PARTITIONED table in HIVE
    

    【讨论】:

      【解决方案6】:

      保存到 Hive 只需使用 SQLContext 的 write() 方法即可:

      df.write.saveAsTable(tableName)
      

      https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)

      从 Spark 2.2 开始:使用 DataSet 代替 DataFrame。

      【讨论】:

      • 我似乎有一个错误,指出作业已中止。我尝试了以下代码 pyspark_df.write.mode("overwrite").saveAsTable("InjuryTab2")
      • 嗨!为什么这个? From Spark 2.2: use DataSet instead DataFrame.
      【解决方案7】:

      对于 Hive 外部表,我在 PySpark 中使用此函数:

      def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
          print("Saving result in {}.{}".format(database, table_name))
          output_schema = "," \
              .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
              .replace("StringType", "STRING") \
              .replace("IntegerType", "INT") \
              .replace("DateType", "DATE") \
              .replace("LongType", "INT") \
              .replace("TimestampType", "INT") \
              .replace("BooleanType", "BOOLEAN") \
              .replace("FloatType", "FLOAT")\
              .replace("DoubleType","FLOAT")
          output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)
      
          sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))
      
          query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
              .format(database, table_name, output_schema, save_format, database, table_name)
          sparkSession.sql(query)
          dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
      

      【讨论】:

        【解决方案8】:

        你可以像这样使用 Hortonworks spark-llap

        import com.hortonworks.hwc.HiveWarehouseSession
        
        df.write
          .format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
          .mode("append")
          .option("table", "myDatabase.myTable")
          .save()
        

        【讨论】:

          【解决方案9】:

          这是从 parquet 文件创建 Hive 表的 PySpark 版本。您可能已经使用推断模式生成了 Parquet 文件,现在想要将定义推送到 Hive 元存储。您还可以将定义推送到 AWS Glue 或 AWS Athena 等系统,而不仅仅是 Hive 元存储。这里我使用 spark.sql 来推送/创建永久表。

             # Location where my parquet files are present.
              df = spark.read.parquet("s3://my-location/data/")
              cols = df.dtypes
              buf = []
              buf.append('CREATE EXTERNAL TABLE test123 (')
              keyanddatatypes =  df.dtypes
              sizeof = len(df.dtypes)
              print ("size----------",sizeof)
              count=1;
              for eachvalue in keyanddatatypes:
                  print count,sizeof,eachvalue
                  if count == sizeof:
                      total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
                  else:
                      total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
                  buf.append(total)
                  count = count + 1
          
              buf.append(' )')
              buf.append(' STORED as parquet ')
              buf.append("LOCATION")
              buf.append("'")
              buf.append('s3://my-location/data/')
              buf.append("'")
              buf.append("'")
              ##partition by pt
              tabledef = ''.join(buf)
          
              print "---------print definition ---------"
              print tabledef
              ## create a table using spark.sql. Assuming you are using spark 2.1+
              spark.sql(tabledef);
          

          【讨论】:

            【解决方案10】:

            在我的情况下,这很好用:

            from pyspark_llap import HiveWarehouseSession
            hive = HiveWarehouseSession.session(spark).build()
            hive.setDatabase("DatabaseName")
            df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv")
            df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()
            

            完成!!

            您可以读取数据, 让你以“员工”的身份给予

            hive.executeQuery("select * from Employee").show()
            

            欲了解更多详情,请使用此网址: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html

            【讨论】:

              【解决方案11】:

              如果您想从数据框创建一个配置单元表(不存在)(有时它无法使用DataFrameWriter.saveAsTable 创建)。 StructType.toDDL 将有助于将列列为字符串。

              val df = ...
              
              val schemaStr = df.schema.toDDL # This gives the columns 
              spark.sql(s"""create table hive_table ( ${schemaStr})""")
              
              //Now write the dataframe to the table
              df.write.saveAsTable("hive_table")
              

              hive_table 将在默认空间中创建,因为我们没有在 spark.sql() 提供任何数据库。 stg.hive_table 可用于在stg 数据库中创建hive_table

              【讨论】:

              猜你喜欢
              • 2016-11-04
              • 2018-08-13
              • 2018-01-09
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2017-05-10
              • 1970-01-01
              • 1970-01-01
              相关资源
              最近更新 更多