【问题标题】:pyspark dataframe column : Hive columnpyspark 数据框列:Hive 列
【发布时间】:2018-10-05 22:37:26
【问题描述】:

我有一个 Hive 表如下:

hive> describe stock_quote;
OK
tickerid                string                                      
tradeday                string                                      
tradetime               string                                      
openprice               string                                      
highprice               string                                      
lowprice                string                                      
closeprice              string                                      
volume                  string

以下 Spark 代码读取 csv 文件并尝试将记录插入 Hive 表:

sc = spark.sparkContext
lines = sc.textFile('file:///<File Location>')
rows = lines.map(lambda line : line.split(','))
rows_map = rows.map(lambda row : Row(TickerId = row[0], TradeDay = row[1], TradeTime = row[2], OpenPrice = row[3], HighPrice = row[4], LowPrice = row[5], ClosePrice = row[6], Volume = row[7]))
rows_df = spark.createDataFrame(rows_map)
rows_df.write.mode('append').insertInto('default.stock_quote')

我面临的问题是,当我在数据帧上调用 show() 函数时,它会按字母顺序打印列,如下所示

|ClosePrice|HighPrice|LowPrice|OpenPrice|TickerId|TradeDay|TradeTime|Volume|

,在表中,将 ClosePrice(DF 中的第 1 列)的值插入 TickerId(Hive 表中的第 1 列)列,将 HighPrice 的值插入 TradeDay 列等。

试图在数据帧上调用 select() 函数,但没有帮助。 尝试将列名列表如下:

rows_df = spark.createDataFrame(rows_map, ["TickerId", "TradeDay", "TradeTime", "OpenPrice", "HighPrice", "LowPrice", "ClosePrice", "Volume"])

上面更改了列名的顺序,但值保持在同一位置,这更不正确。

任何帮助将不胜感激。

【问题讨论】:

    标签: apache-spark dataframe hive


    【解决方案1】:

    您应该使用namedtuple 而不是Row,因为'Row' 尝试对列名进行排序。因此排序的列名与default.stock_quote表的列顺序不匹配请查看What is the Scala case class equivalent in PySpark?了解更多详情

    所以你应该这样做

    from collections import namedtuple
    
    table = namedtuple('table', ['TickerId', 'TradeDay', 'TradeTime', 'OpenPrice', 'HighPrice', 'LowPrice', 'ClosePrice', 'Volume'])
    rows_map = rows.map(lambda row : table(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))
    

    正如@user6910411 建议的那样,“一个普通的元组也可以

    rows_map = rows.map(lambda row : (row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))
    rows_df = spark.createDataFrame(rows_map, ['TickerId', 'TradeDay', 'TradeTime', 'OpenPrice', 'HighPrice', 'LowPrice', 'ClosePrice', 'Volume'])
    

    现在insertInto 应该可以工作了

    【讨论】:

      【解决方案2】:

      您也可以使用saveAsTable 代替insertInto

      来自docs

      insertInto 不同,saveAsTable 将使用列名来查找正确的列位置

      【讨论】:

      • 非常感谢。有效。然而,我面临的一个挑战是,它引发了如下错误:“现有表 default.stock_quote 的格式是 HiveFileFormat。当我尝试执行以下行时,它与指定的格式 TextFileFormat 不匹配代码: rows_df.write.format('text').mode('append').saveAsTable('stock_quote')。有趣的是,当我更改为 format('hive') 时,记录插入成功。尝试使用 format(' text'),但同样的错误。有什么想法吗?
      【解决方案3】:

      它是如何按字母顺序排序的? csv文件是这样的吗?

      无论如何,我会按照以下步骤进行:

      • 从表格中选择列
      • 根据表中的列重新排列数据框
      # pyspark below
      list_columns = spark.sql('select * from table').columns # there might be simpler way
      dataframe.select(*list_columns)
      

      【讨论】:

        猜你喜欢
        • 2022-01-01
        • 2018-11-19
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-07-11
        • 2017-08-12
        • 2021-08-02
        • 2023-03-31
        相关资源
        最近更新 更多