【问题标题】:Spark Scala 2.10 tuple limitSpark Scala 2.10 元组限制
【发布时间】:2016-02-22 22:18:52
【问题描述】:

我有 66 列要处理的 DataFrame(几乎每个列的值都需要以某种方式更改)所以我正在运行以下语句

    val result = data.map(row=> (
        modify(row.getString(row.fieldIndex("XX"))),
        (...)
        )
    )

直到第 66 列。 由于此版本中的 scala 限制为 22 对的最大元组,因此我不能那样执行。 问题是,有什么解决方法吗? 在所有行操作之后,我将其转换为具有特定列名的 df

   result.toDf("c1",...,"c66")
   result.storeAsTempTable("someFancyResult")

“修改”功能只是说明我的观点的一个例子

【问题讨论】:

  • 切换到 scala 2.11 ?
  • 我希望事情可以这么简单,但事实并非如此
  • @Archeg。嗯有趣。从 2.11.0 开始,案例类的限制肯定被取消了
  • @Odomontois 有趣的讨论:news.ycombinator.com/item?id=7621622 似乎只删除了案例类限制。我不认为你可以在不生成代码的情况下删除元组限制,而且他们还没有元组限制,所以很难介绍。案例分类很容易 - 因为它们无论如何都会生成。仍然没有为 arity > 22 的案例类生成 unapply
  • @Silverrose 我认为解决问题的一种方法是尝试使用shapelessspark,但我不知道这可能有多复杂。这似乎是有人尝试:github.com/tresata/spark-columnar

标签: scala apache-spark tuples dataframe limit


【解决方案1】:

绕过它的方法很繁琐,但它确实有效,试试这个示例代码让你开始吧,你会看到有超过 22 列被访问:

object SimpleApp {
  class Record(val x1: String, val x2: String, val x3: String, ... val x24:String) extends Product with Serializable {
    def canEqual(that: Any) = that.isInstanceOf[Record]

    def productArity = 24

    def productElement(n: Int) = n match {
      case 0 => x1
      case 1 => x2
      case 2 => x3
      ...
      case 23 => x24
    }
  }

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Product Test")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc);

    val record = new Record("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x")

    import sqlContext._
    sc.parallelize(record :: Nil).registerAsTable("records")

    sql("SELECT x1 FROM records").collect()
  }
}

【讨论】:

    【解决方案2】:

    如果您所做的只是修改现有 DataFrame 的值,最好使用 UDF 而不是映射到 RDD:

    import org.apache.spark.sql.functions.udf
    
    val modifyUdf = udf(modify)
    data.withColumn("c1", modifyUdf($"c1"))
    

    如果由于某种原因上述不符合您的需求,您可以做的最简单的事情是从RDD[Row] 重新创建DataFrame。比如这样:

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructField, StructType, IntegerType}
    
    
    val result: RDD[Row] = data.map(row => {
      val buffer = ArrayBuffer.empty[Any]
    
      // Add value to buffer
      buffer.append(modify(row.getAs[String]("c1")))
    
      // ... repeat for other values
    
      // Build row
      Row.fromSeq(buffer)
    })
    
    // Create schema
    val schema = StructType(Seq(
      StructField("c1", StringType, false),
      // ...  
      StructField("c66", StringType, false)
    ))
    
    sqlContext.createDataFrame(result, schema)
    

    【讨论】:

    • 没错,如果他已经有一个Dataframe,这是更简单的路线
    • @Ewan 看起来是这样。它不仅是最简单的方法,而且效率更高。
    • 非常感谢!这就是拯救我的原因:)
    • @Silverrose 我很高兴听到这个消息 :) 请不要忘记投票 :)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-12-28
    • 2020-12-17
    • 2018-04-09
    • 2012-10-25
    • 1970-01-01
    • 2018-07-16
    • 2022-07-21
    相关资源
    最近更新 更多