【问题标题】:Saving dataframe to .txt or .csv file将数据框保存到 .txt 或 .csv 文件
【发布时间】:2020-03-29 09:23:19
【问题描述】:

我正在研究一种机器学习算法来预测以太坊的价格。我已经有一个小数据集,我正在为它做预测。我可以在终端中打印预测,并且可以看到它们。但是,我无法将它们保存为文本/csv 文件。这是我的代码

from pyspark.sql.types import *
from pyspark.sql import Row, SparkSession
from pyspark.mllib.util import MLUtils
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import DateType

from pyspark import SparkContext
import pyspark
import datetime

sc = pyspark.SparkContext()
spark = SparkSession.builder.appName('Ethereum').getOrCreate()
 #get csv file as a DataFram object
data = spark.read.csv('hdfs://andromeda.student.eecs.qmul.ac.uk/user/cln31/ethereum', header=True,inferSchema=True)

#DataFrame type
data = data.select(data.date.cast("int"),
                    data.PriceBTC.cast("float"),
                    data.PriceUSD.cast("float"),
                    data.TxCnt.cast("float"),
                    data.TxTfrValMedUSD.cast("float"),
                    data.CapMrktCurUSD.cast("float"),
                    data.IssContUSD.cast("float"),
                    data.TxTfrValMeanUSD.cast("float"),
                    data.TxTfrValUSD.cast("float"))


data.printSchema()

featureassembler=VectorAssembler(inputCols=["date","TxTfrValMedUSD","CapMrktCurUSD","TxCnt","TxTfrValUSD", "IssContUSD", "TxTfrValMeanUSD"],outputCol="Independent Features")
output = featureassembler.setHandleInvalid("skip").transform(data)
output.show()

output.select("Independent Features").show()

finalized_data=output.select("Independent Features","PriceUSD")
finalized_data.show()

train_data,test_data=finalized_data.randomSplit([0.75,0.25])

regressor=LinearRegression(featuresCol='Independent Features', labelCol='PriceUSD')
regressor=regressor.fit(train_data)

test_data1 = output.filter(data.date >= 1455408000) #2016.02.14
test_data1 = test_data1.filter(test_data1.date <= 1561852800) #2019.06.30

test_data1 = test_data1.select("Independent Features","PriceUSD")

test_data1.show()


pred_results=regressor.evaluate(test_data1)
pred_results.predictions.describe().show()

pred_results.predictions.write.csv("partCOut.csv")

我基本上想保存 pred_results.predictions 的输出。这是我得到的错误:

pyspark.sql.utils.AnalysisException: u'CSV数据源不支持struct&lt;type:tinyint,size:int,indices:array&lt;int&gt;,values:array&lt;double&gt;&gt;数据类型。;'

【问题讨论】:

  • 那么我该怎么做才能将它保存为文本文件?
  • 你把每一列都转换成字符串*
  • 或另存为json
  • 正如@Steven 提到的,保持结构的最佳方法是将其保存为json,否则您将需要转换每一列或解析数组以将其保存为字符串列。跨度>

标签: apache-spark hadoop pyspark output bigdata


【解决方案1】:

听起来很奇怪。我最近处理过这样的事情。我认为问题在于我试图保存的对象不是实际的数据框。将其转换为数据框解决了问题,然后我可以保存文件(在我的场景中,它会转到 SQL 服务器表)。

试试这样的。

# your code...
df = pd.DataFrame(mylist)

display(df)

# convert python df to spark df
spark_df = spark.createDataframe(df)
# write df out as table
spark_df.write.csv("/rawdata/AAA.csv")

要检查对象的类型,试试这个。

z = []
type(z)
<type 'list'>

z = ()
type(z)
<type 'tuple'>

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-09-11
    • 2015-06-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-19
    • 1970-01-01
    相关资源
    最近更新 更多