【问题标题】:How to create a Row from a List or Array in Spark using Scala如何使用 Scala 从 Spark 中的列表或数组创建行
【发布时间】:2015-01-23 10:43:50
【问题描述】:

我正在尝试根据用户输入创建一个行 (org.apache.spark.sql.catalyst.expressions.Row)。我无法随机创建行。

是否有从ListArray 创建行的功能。

例如,如果我有一个具有以下格式的.csv 文件,

"91xxxxxxxxxx,21.31,15,0,0"

如果用户输入[1, 2],那么我只需要获取第二列和第三列以及第一列customer_id

我尝试用代码解析它:

val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) `

其中 foo 被定义为

def f(n: List[Int], s: String) : Row = {
    val n = input.length
    var out = new Array[Any](n+1)
    var r = s.split(",")
    out(0) = r(0)
    for (i <- 1 to n)
        out(i) = r(input(i-1)).toDouble
    Row(out)
}

输入是一个列表说

val input = List(1,2)

执行此代码我得到 l3 为:

Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916])

但我想要的是:

Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])`

必须传递此参数才能在 Spark SQL 中创建架构

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    类似下面的东西应该可以工作:

    import org.apache.spark.sql._
    
    def f(n: List[Int], s: String) : Row =
      Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq)
    

    【讨论】:

    • 如果我想将它解析为一行 3 个字符串值,这很好用。但是如何使用呢,如果第一个值是一个字符串,那么第二个和第三个值是Double呢?有可能吗?
    【解决方案2】:

    您缺少 StructField 和 StructType 的创建。参考官方指南http://spark.apache.org/docs/latest/sql-programming-guide.html,部分Programmatically Specifying Schema

    我不是 Scala 专家,但在 Python 中它看起来像这样:

    from pyspark.sql import *
    sqlContext = SQLContext(sc)
    
    input = [1,2]
    
    def parse(line):
        global input
        l = line.split(',')
        res = [l[0]]
        for ind in input:
            res.append(l[ind])
        return res
    
    csv  = sc.textFile("file:///tmp/inputfile.csv")
    rows = csv.map(lambda x: parse(x))
    
    fieldnum = len(input) + 1
    fields = [StructField("col"+str(i), StringType(), True) for i in range(fieldnum)]
    schema = StructType(fields)
    
    csvWithSchema = sqlContext.applySchema(rows, schema)
    csvWithSchema.registerTempTable("test")
    sqlContext.sql("SELECT * FROM test").collect()
    

    简而言之,您不应该直接将它们转换为 Row 对象,只需保留为 RDD 并使用 applySchema 对其应用架构即可

    【讨论】:

    • 不错的解决方案 - 请记住 sqlContext.applySchema 在 spark 2.x 中已弃用,因此最好使用数据框解决方案。
    • 数据嵌套了怎么办?例如。我们有一个StructType?
    • 问题是如何创建Row对象的RDD,这里你创建一个DataFrame
    【解决方案3】:

    你也可以试试:

        Row.fromSeq(line(0).toString ++ line(1).toDouble ++ line(2).toDouble ++ line.slice(2, line.size).map(value => value.toString))
    

    【讨论】:

    • 仅适用于 scala 2.12 以上版本
    猜你喜欢
    • 2017-02-03
    • 1970-01-01
    • 2015-01-17
    • 2020-09-22
    • 2015-01-05
    • 2021-11-20
    • 1970-01-01
    • 1970-01-01
    • 2018-06-18
    相关资源
    最近更新 更多