【问题标题】:Scala Spark udf java.lang.UnsupportedOperationExceptionScala Spark udf java.lang.UnsupportedOperationException
【发布时间】:2018-11-22 01:47:12
【问题描述】:

我创建了这个柯里化函数来检查 udf 中 endDateStr 的空值,代码如下:(col x 的类型是 ArrayType[TimestampType]):

    def _getCountAll(dates: Seq[Timestamp]) = Option(dates).map(_.length)
    def _getCountFiltered(endDate: Timestamp)(dates: Seq[Timestamp]) = Option(dates).map(_.count(!_.after(endDate)))

    val getCountUDF = udf((endDateStr: Option[String]) => {
      endDateStr match {
        case None => _getCountAll _
        case Some(value) => _getCountFiltered(Timestamp.valueOf(value + " 23:59:59")) _
      }
    })
    df.withColumn("distinct_dx_count", getCountUDF(lit("2009-09-10"))(col("x")))

但是我在执行时遇到了这个异常:

java.lang.UnsupportedOperationException:类型的架构 Seq[java.sql.Timestamp] => Option[Int] 不支持

谁能帮我找出我的错误?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    你不能像这样咖喱udf。如果你想要类似 curry 的行为,你应该从外部函数返回 udf

    def getCountUDF(endDateStr: Option[String]) = udf {
      endDateStr match {
        case None => _getCountAll _
        case Some(value) => 
          _getCountFiltered(Timestamp.valueOf(value + " 23:59:59")) _
      }
    }
    
    df.withColumn("distinct_dx_count", getCountUDF(Some("2009-09-10"))(col("x")))
    

    否则只需放弃柯里化并同时提供两个参数:

    val getCountUDF = udf((endDateStr: String, dates: Seq[Timestamp]) => 
      endDateStr match {
        case null => _getCountAll(dates)
        case _ => 
          _getCountFiltered(Timestamp.valueOf(endDateStr + " 23:59:59"))(dates)
      }
    )
    
    df.withColumn("distinct_dx_count", getCountUDF(lit("2009-09-10"), col("x")))
    

    【讨论】:

      猜你喜欢
      • 2016-12-02
      • 2021-12-07
      • 2021-02-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-05-21
      • 1970-01-01
      • 2018-09-17
      相关资源
      最近更新 更多