【问题标题】:Creating User Defined Function in Spark-SQL在 Spark-SQL 中创建用户定义函数
【发布时间】:2014-09-21 18:12:46
【问题描述】:

我是 spark 和 spark sql 的新手,我正在尝试使用 spark SQL 查询一些数据。

我需要从以字符串形式给出的日期中获取月份。

我认为直接从 sparkqsl 查询月份是不可能的,所以我想在 scala 中编写一个用户定义的函数。

是否可以在 sparkSQL 中编写 udf,如果可能,任何人都可以建议编写 udf 的最佳方法。

【问题讨论】:

标签: sql apache-spark


【解决方案1】:

如果您愿意使用语言集成查询,您可以这样做,至少对于过滤而言。

对于包含以下内容的数据文件 dates.txt:

one,2014-06-01
two,2014-07-01
three,2014-08-01
four,2014-08-15
five,2014-09-15

您可以根据需要在 UDF 中包含尽可能多的 Scala 日期魔法,但我会保持简单:

def myDateFilter(date: String) = date contains "-08-"

如下进行设置——其中很多来自Programming guide

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

// case class for your records
case class Entry(name: String, when: String)

// read and parse the data
val entries = sc.textFile("dates.txt").map(_.split(",")).map(e => Entry(e(0),e(1)))

您可以将 UDF 用作 WHERE 子句的一部分:

val augustEntries = entries.where('when)(myDateFilter).select('name, 'when)

并查看结果:

augustEntries.map(r => r(0)).collect().foreach(println)

注意我使用的where 方法的版本,在文档中声明如下:

def where[T1](arg1: Symbol)(udf: (T1) ⇒ Boolean): SchemaRDD

因此,UDF 只能采用一个参数,但您可以组合多个 .where() 调用来过滤多个列。

编辑 Spark 1.2.0(实际上也是 1.1.0)

虽然没有真正记录在案,但 Spark 现在支持注册 UDF,以便可以从 SQL 中查询。

上面的UDF可以使用:

sqlContext.registerFunction("myDateFilter", myDateFilter)

如果表已注册

sqlContext.registerRDDAsTable(entries, "entries")

可以用

查询
sqlContext.sql("SELECT * FROM entries WHERE myDateFilter(when)")

更多详情请见this example

【讨论】:

  • UDAF(用户自定义聚合函数)呢?
  • 我也一直想知道这一点,但到目前为止还没有发现任何证据表明它受到支持。如果您愿意编写 Hive 查询,则支持,如您在 the tests 中所见
  • 事实证明这是在SPARK-3947 中跟踪的——尚不支持。
  • 对于 Spark 1.3+ 使用 sqlContext.udf.register("myDateFilter", myDateFilter)
【解决方案2】:

在 Spark 2.0 中,您可以这样做:

// define the UDF
def convert2Years(date: String) = date.substring(7, 11)
// register to session
sparkSession.udf.register("convert2Years", convert2Years(_: String))
val moviesDf = getMoviesDf // create dataframe usual way
moviesDf.createOrReplaceTempView("movies") // 'movies' is used in sql below
val years = sparkSession.sql("select convert2Years(releaseDate) from movies")

【讨论】:

    【解决方案3】:

    PySpark 1.5 及以上版本中,我们可以通过内置函数轻松实现这一点。

    以下是一个例子:

    raw_data = 
    [
    
    ("2016-02-27 23:59:59", "Gold", 97450.56),
    
    ("2016-02-28 23:00:00", "Silver", 7894.23),
    
    ("2016-02-29 22:59:58", "Titanium", 234589.66)]
    
    
    Time_Material_revenue_df  = 
    sqlContext.createDataFrame(raw_data, ["Sold_time", "Material", "Revenue"])
    
    from pyspark.sql.functions import  *
    
    Day_Material_reveneu_df = Time_Material_revenue_df.select(to_date("Sold_time").alias("Sold_day"), "Material", "Revenue")
    

    【讨论】:

      猜你喜欢
      • 2019-11-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-11-13
      • 1970-01-01
      相关资源
      最近更新 更多