【发布时间】:2015-08-20 06:07:29
【问题描述】:
是否可以将 spark 中的DataFrame 直接保存到 Hive 中?
我尝试将DataFrame 转换为Rdd,然后另存为文本文件,然后加载到配置单元中。但是我想知道我是否可以直接将dataframe保存到hive
【问题讨论】:
标签: scala apache-spark hive apache-spark-sql
是否可以将 spark 中的DataFrame 直接保存到 Hive 中?
我尝试将DataFrame 转换为Rdd,然后另存为文本文件,然后加载到配置单元中。但是我想知道我是否可以直接将dataframe保存到hive
【问题讨论】:
标签: scala apache-spark hive apache-spark-sql
您可以使用 sqlContext 创建内存中的临时表并将它们存储在 hive 表中。
假设您的数据框是 myDf。您可以使用创建一个临时表,
myDf.createOrReplaceTempView("mytempTable")
然后您可以使用简单的 hive 语句来创建表并转储临时表中的数据。
sqlContext.sql("create table mytable as select * from mytempTable");
【讨论】:
temporary 表与hive 表混合和匹配的?在执行show tables 时,它仅包含我的spark 2.3.0 安装的hive 表
使用DataFrameWriter.saveAsTable。 (df.write.saveAsTable(...)) 见Spark SQL and DataFrame Guide。
【讨论】:
df.write().saveAsTable(tableName)是否也会将流数据写入表中?
我没有在 Spark 2.0 文档中看到 df.write.saveAsTable(...) 已弃用。它在 Amazon EMR 上对我们有用。我们完全能够将数据从 S3 读取到数据帧中,对其进行处理,从结果中创建一个表格并使用 MicroStrategy 读取它。
不过,Vinays 的回答也奏效了。
【讨论】:
你需要拥有/创建一个 HiveContext
import org.apache.spark.sql.hive.HiveContext;
HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
然后直接保存dataframe或者选择列存储为hive表
df 是数据帧
df.write().mode("overwrite").saveAsTable("schemaName.tableName");
或
df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
或
df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
SaveModes 是 Append/Ignore/Overwrite/ErrorIfExists
我在这里添加了 Spark 文档中 HiveContext 的定义,
除了基本 SQLContext 之外,您还可以创建 HiveContext,它提供基本 SQLContext 提供的功能的超集。其他功能包括使用更完整的 HiveQL 解析器编写查询的能力、对 Hive UDF 的访问以及从 Hive 表读取数据的能力。要使用 HiveContext,您不需要现有的 Hive 设置,并且 SQLContext 可用的所有数据源仍然可用。 HiveContext 仅单独打包,以避免在默认 Spark 构建中包含 Hive 的所有依赖项。
在 Spark 版本 1.6.2 上,使用“dbName.tableName”会出现此错误:
org.apache.spark.sql.AnalysisException:临时表不允许指定数据库名称或其他限定符。如果表名中有点(.),请用反引号()引用表名。`
【讨论】:
df.write().mode...需要改成df.write.mode...
抱歉,帖子写得太晚了,但我没有看到任何可接受的答案。
df.write().saveAsTable 将抛出 AnalysisException 并且与 HIVE 表不兼容。
将 DF 存储为 df.write().format("hive") 应该可以解决问题!
但是,如果这不起作用,那么按照之前的 cmets 和答案,这是我认为最好的解决方案(尽管接受建议)。
最好的办法是显式创建 HIVE 表(包括 PARTITIONED 表),
def createHiveTable: Unit ={
spark.sql("CREATE TABLE $hive_table_name($fields) " +
"PARTITIONED BY ($partition_column String) STORED AS $StorageType")
}
将DF保存为临时表,
df.createOrReplaceTempView("$tempTableName")
并插入 PARTITIONED HIVE 表:
spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
spark.sql("select * from default.$hive_table_name").show(1000,false)
DF 中的 LAST COLUMN 将是 PARTITION COLUMN,因此请相应地创建 HIVE 表!
如果有效请评论!与否。
--更新--
df.write()
.partitionBy("$partition_column")
.format("hive")
.mode(SaveMode.append)
.saveAsTable($new_table_name_to_be_created_in_hive) //Table should not exist OR should be a PARTITIONED table in HIVE
【讨论】:
保存到 Hive 只需使用 SQLContext 的 write() 方法即可:
df.write.saveAsTable(tableName)
从 Spark 2.2 开始:使用 DataSet 代替 DataFrame。
【讨论】:
From Spark 2.2: use DataSet instead DataFrame.
对于 Hive 外部表,我在 PySpark 中使用此函数:
def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
print("Saving result in {}.{}".format(database, table_name))
output_schema = "," \
.join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
.replace("StringType", "STRING") \
.replace("IntegerType", "INT") \
.replace("DateType", "DATE") \
.replace("LongType", "INT") \
.replace("TimestampType", "INT") \
.replace("BooleanType", "BOOLEAN") \
.replace("FloatType", "FLOAT")\
.replace("DoubleType","FLOAT")
output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)
sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))
query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
.format(database, table_name, output_schema, save_format, database, table_name)
sparkSession.sql(query)
dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
【讨论】:
你可以像这样使用 Hortonworks spark-llap 库
import com.hortonworks.hwc.HiveWarehouseSession
df.write
.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
.mode("append")
.option("table", "myDatabase.myTable")
.save()
【讨论】:
这是从 parquet 文件创建 Hive 表的 PySpark 版本。您可能已经使用推断模式生成了 Parquet 文件,现在想要将定义推送到 Hive 元存储。您还可以将定义推送到 AWS Glue 或 AWS Athena 等系统,而不仅仅是 Hive 元存储。这里我使用 spark.sql 来推送/创建永久表。
# Location where my parquet files are present.
df = spark.read.parquet("s3://my-location/data/")
cols = df.dtypes
buf = []
buf.append('CREATE EXTERNAL TABLE test123 (')
keyanddatatypes = df.dtypes
sizeof = len(df.dtypes)
print ("size----------",sizeof)
count=1;
for eachvalue in keyanddatatypes:
print count,sizeof,eachvalue
if count == sizeof:
total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
else:
total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
buf.append(total)
count = count + 1
buf.append(' )')
buf.append(' STORED as parquet ')
buf.append("LOCATION")
buf.append("'")
buf.append('s3://my-location/data/')
buf.append("'")
buf.append("'")
##partition by pt
tabledef = ''.join(buf)
print "---------print definition ---------"
print tabledef
## create a table using spark.sql. Assuming you are using spark 2.1+
spark.sql(tabledef);
【讨论】:
在我的情况下,这很好用:
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("DatabaseName")
df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv")
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()
完成!!
您可以读取数据, 让你以“员工”的身份给予
hive.executeQuery("select * from Employee").show()
欲了解更多详情,请使用此网址: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html
【讨论】:
如果您想从数据框创建一个配置单元表(不存在)(有时它无法使用
DataFrameWriter.saveAsTable创建)。StructType.toDDL将有助于将列列为字符串。
val df = ...
val schemaStr = df.schema.toDDL # This gives the columns
spark.sql(s"""create table hive_table ( ${schemaStr})""")
//Now write the dataframe to the table
df.write.saveAsTable("hive_table")
hive_table 将在默认空间中创建,因为我们没有在 spark.sql() 提供任何数据库。 stg.hive_table 可用于在stg 数据库中创建hive_table。
【讨论】: