【问题标题】:Converting to RDD fails转换为 RDD 失败
【发布时间】:2021-09-06 07:40:40
【问题描述】:

我的代码如下。我读了一个包含两列的 CSV 文件。通过转换为 RDD 循环遍历 Dataframe 的元素。现在我想为每个元素创建一个 DF。下面的代码失败。谁能帮忙。

    val df1 = spark.read.format("csv").load("c:\\file.csv") //CSV has 3 columns
     
    for (row <- df1.rdd.collect)
     {
       var tab1 =  row.mkString(",").split(",")(0) //Has Tablename
       var tab2 =  row.mkString(",").split(",")(1) //One Select Statment
       var tab3 =  row.mkString(",").split(",")(1) //Another Select Statment      
       
       val newdf = spark.createDataFrame(tab1).toDF("Col") // This is not working
               
     }
    

我想将 tab2 数据框与 tab3 连接并附加表名。例如

在 tab2 和 tab3 中执行查询给出以下结果。

Col1     col2
---      ---
A         B
C         D
E         F
G         H

我想要如下:

Col0  Col1  Col2
----  ----   ---
Tab1   A      B
Tab1   C      D
Tab2   E      F
Tab3   G      h 

现在 tab1 tab2 tab2.. etc 此信息在正在读取的 CSV 文件中。我想将该 col0 转换为数据帧,以便我可以在 Spark Sql 中读取

【问题讨论】:

  • 您能告诉我们为什么要为 CSV 的每一行创建一个数据帧吗?你得到的错误是什么?
  • 我不是每行都制作数据帧,而是希望每列都有。实际上,csv 的每一列都有一个我需要执行的 Oracle 查询。
  • 对不起,我不明白你在做什么。在这里,您循环遍历 rdd 的所有行,并为每个行创建一个数据框。您的代码可能不起作用,因为spark.createDataFrame 需要一个元组序列。例如,您能否提供一个示例输入和预期输出?
  • 非常抱歉。你想要做什么对我来说仍然不清楚。 csv文件的内容是什么?
  • 我正在从 tab2 和 tab3 创建一个临时视图。做一个交叉连接。现在我想再次与 tab1 交叉连接。我怎样才能做到这一点。希望这很清楚

标签: scala entity-framework apache-spark


【解决方案1】:

我能够在下面解决我的替换问题:

val newdf = spark.createDataFrame(tab1).toDF("Col") // This is not working

val newDf = spark.sparkContext.parallelize(Seq(newdf)).toDF("Col")

【讨论】:

    猜你喜欢
    • 2016-01-07
    • 2021-02-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-25
    • 1970-01-01
    • 1970-01-01
    • 2020-01-24
    相关资源
    最近更新 更多