【问题标题】:Passing more than 22 columns to a UDF in Spark Java在 Spark Java 中将超过 22 列传递给 UDF
【发布时间】:2019-01-13 13:19:29
【问题描述】:

我有一个用我的 Spark Java 代码编写的 UDF,我想在其中传递超过 22 列(正好 24 列)。但是 Spark API 最多只允许 22 列,有什么技巧可以覆盖这个限制,或者我可以创建自定义 UDF 函数来覆盖这个限制吗?

【问题讨论】:

    标签: java apache-spark apache-spark-sql


    【解决方案1】:

    您可以传递复杂类型的列。最通用的解决方案是 Struct,但您也可以考虑使用 Array 或 Map。

    地图示例中的参数:

        val df = sc.parallelize(Seq(("a","b"),("c","d"), 
          ("e","f"))).toDF("one","two")
    
    
         val myUDF = udf((input:Map[String,String]) => {
          // do something with the input
           input("one")=="a"
           })
    
         df
        .withColumn("udf_args",map(
           lit("one"),$"one",
            lit("two"),$"one"
          )
        )
        .withColumn("udf_result", myUDF($"udf_args"))
         .show()
    

    【讨论】:

      【解决方案2】:

      您可以将列值数组传递给 udf,而不是传递 24 个列值,并且操作将在数组上进行。 这是示例代码:

      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.expressions.UserDefinedFunction
      
      case class department(id: Integer, deptname: String)
      import spark.implicits._
      val df1 = Seq(department(1, "physics")
            , department(2, "computer")).toDF()
      val df2 = df1.withColumn("all_col", array($"id", $"deptname"))
      val concat_udf:UserDefinedFunction = udf((all_col_values:Seq[String]) => {
            (all_col_values(0) + "-" + all_col_values(1))
          })
      //apply udf
      val df3 = df2.withColumn("all_col_concat",concat_udf(col("all_col")))
      df3.show()
      

      额外: 如果可以在没有 udf 的情况下对每一行应用匿名函数,您可以尝试这种方式,但不确定它是否满足要求。

      import org.apache.spark.sql.Row
      val df4 = df1.rdd.map{ case Row(id:Integer, deptname:String) => (id, deptname,id.toString()+"-"+deptname)}.
                toDF("id","deptname", "all_col_concat")
      df4.show()
      

      【讨论】:

        【解决方案3】:

        我看到很多答案都是用 scala 写的,正如你在 spark java 中所问的那样,我将用 Java 重写它。答案也可以在任意数量的列中使用。

        import static org.apache.spark.sql.functions.array;
        
        List<Column> cols =  Arrays.asList(new Column[] {ds.select("col1"), ds.select("col2") ...});// all the columns
        Column mergedCol = array(cols.toArray(new Column[cols.size()])); //merge all your cols
        //udf
        UserDefinedFunction myUdf = udf(
            (Seq<Object> seq) -> {
                //you should have 24 Objects here. 
                for (Object o : JavaConverters.seqAsJavaListConverter(seq).asJava()) {                  
                        ...         
                );
            },
            DataTypes.[your data type]);
        //use it as
        ds.select(myUdf.apply(mergedCol));
        

        【讨论】:

          猜你喜欢
          • 2021-12-06
          • 2018-02-02
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2015-10-27
          • 2023-03-06
          • 2017-12-11
          • 2016-06-03
          相关资源
          最近更新 更多