【问题标题】:scala spark dataframe modify column with udf return valuescala spark数据框修改带有udf返回值的列
【发布时间】:2020-08-25 17:07:39
【问题描述】:

我有一个带有时间戳字段的 spark 数据框,我想将其转换为 long 数据类型。我使用了 UDF 并且独立代码工作正常,但是当我插入到需要转换任何时间戳的通用逻辑时,我无法使其正常工作。问题是如何将 UDF 的返回值返回到数据帧专栏

下面是代码sn-p

    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test3").getOrCreate();
      import org.apache.spark.sql.functions._
      val sqlContext  = spark.sqlContext
      val df2 = sqlContext.jsonRDD(spark.sparkContext.parallelize(Array(
        """{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": "","manufacture_ts":"2017-10-16 00:00:00"}""",
        """{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": "","manufacture_ts":"2017-10-16 00:00:00"}""",
      )))

      val convertTimeStamp = udf { (manTs :java.sql.Timestamp) =>
        manTs.getTime
      }

        df2.withColumn("manufacture_ts",getTime(df2("manufacture_ts"))).show

       +-----+----------+-----+--------------+-----+----+
        |     |No Comment|Tesla| 1508126400000|    S|2012|
        |     |   Get one| Ford| 1508126400000| E350|1997|
        |     |          |Chevy| 1508126400000| Volt|2015|
        +-----+----------+-----+--------------+-----+----+ 

    Now i want to invoke this from a dataframe to be clled on all columns which are of type long

    object Test4 extends App{

        val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test").getOrCreate();
        import spark.implicits._

        import scala.collection.JavaConversions._    
        val long : Long  = "1508299200000".toLong    

        val data = Seq(Row("10000020_LUX_OTC",long,"2020-02-14"))

        val schema = List( StructField("rowkey",StringType,true)
                                  ,StructField("order_receipt_dt",LongType,true)
                                  ,StructField("maturity_dt",StringType,true))

        val dataDF =  spark.createDataFrame(spark.sparkContext.parallelize(data),StructType(schema))

        val modifedDf2= schema.foldLeft(dataDF) { case (newDF,StructField(name,dataType,flag,metadata)) =>
          newDF.withColumn(name,DataTypeUtil.transformLong(newDF,name,dataType.typeName))
modifedDf2,show
        }

      }


      val convertTimeStamp = udf { (manTs :java.sql.Timestamp) =>
        manTs.getTime
      }

      def transformLong(dataFrame: DataFrame,name:String, fieldType:String):Column = {
        import org.apache.spark.sql.functions._

        fieldType.toLowerCase match {

          case "timestamp"  => convertTimeStamp(dataFrame(name))
          case _ => dataFrame.col(name)
        }
      }

【问题讨论】:

  • 您能解释一下您要做什么以及遇到什么问题吗?
  • 你为什么不用unix_timestamp()

标签: scala dataframe apache-spark user-defined-functions apache-spark-dataset


【解决方案1】:

如果时间戳为空,您的 udf 可能会崩溃您可以这样做:

  • 使用 unix_timestamp 代替 UDF.. 或使您的 UDF 为空安全
  • 仅适用于需要转换的字段。

给定数据:

导入 spark.implicits._

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

val df = Seq(
  (1L,Timestamp.valueOf(LocalDateTime.now()),Timestamp.valueOf(LocalDateTime.now()))
).toDF("id","ts1","ts2")

你可以这样做:

val newDF = df.schema.fields.filter(_.dataType == TimestampType).map(_.name)
  .foldLeft(df)((df,field) => df.withColumn(field,unix_timestamp(col(field))))

newDF.show()

给出:

+---+----------+----------+
| id|       ts1|       ts2|
+---+----------+----------+
|  1|1589109282|1589109282|
+---+----------+----------+

【讨论】:

猜你喜欢
  • 2018-09-17
  • 2017-11-01
  • 2020-02-12
  • 1970-01-01
  • 2017-05-21
  • 1970-01-01
  • 1970-01-01
  • 2017-08-08
  • 1970-01-01
相关资源
最近更新 更多