【发布时间】:2016-04-27 23:30:02
【问题描述】:
我有一个 pyspark 应用程序。我将一个 hive 表复制到我的 hdfs 目录中,并且在 python 中我 sqlContext.sql 对该表进行了查询。现在这个变量是一个我称之为rows的数据框。我需要随机打乱rows,所以我必须将它们转换为行列表rows_list = rows.collect()。然后我shuffle(rows_list) 将列表重新排列到位。我取了我需要的随机行数量x:
for r in range(x):
allrows2add.append(rows_list[r])
现在我想将 allrows2add 保存为一个 hive 表或附加一个现有的 hive 表(以更容易做的为准)。问题是我不能这样做:
all_df = sc.parallelize(allrows2add).toDF() 不能这样做,无法推断架构
ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
没有放入整个架构。 rows 的架构有 117 列,所以我不想把它们打出来。有没有办法提取rows 的架构来帮助我制作 allrows2add 数据框或以某种方式保存为配置单元表?
我可以
rows.printSchema() 但不确定如何将其转换为模式格式作为变量传递 toDF() 而无需解析所有文本
谢谢
添加循环信息
#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()
for i in range(len(Table)):
rows = sqlContext.sql(qry)
val1 = Table[i][0]
val2 = Table[i][1]
count = Table[i][2]
x = 100 - count
#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;
query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)
rows = sqlContext.sql(query)
rows = rows.withColumn("col4", lit(10))
rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
#rows_list = rows.collect()
#shuffle(rows_list)
【问题讨论】:
标签: python hive pyspark pyspark-sql