如果您愿意使用语言集成查询,您可以这样做,至少对于过滤而言。
对于包含以下内容的数据文件 dates.txt:
one,2014-06-01
two,2014-07-01
three,2014-08-01
four,2014-08-15
five,2014-09-15
您可以根据需要在 UDF 中包含尽可能多的 Scala 日期魔法,但我会保持简单:
def myDateFilter(date: String) = date contains "-08-"
如下进行设置——其中很多来自Programming guide。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
// case class for your records
case class Entry(name: String, when: String)
// read and parse the data
val entries = sc.textFile("dates.txt").map(_.split(",")).map(e => Entry(e(0),e(1)))
您可以将 UDF 用作 WHERE 子句的一部分:
val augustEntries = entries.where('when)(myDateFilter).select('name, 'when)
并查看结果:
augustEntries.map(r => r(0)).collect().foreach(println)
注意我使用的where 方法的版本,在文档中声明如下:
def where[T1](arg1: Symbol)(udf: (T1) ⇒ Boolean): SchemaRDD
因此,UDF 只能采用一个参数,但您可以组合多个 .where() 调用来过滤多个列。
编辑 Spark 1.2.0(实际上也是 1.1.0)
虽然没有真正记录在案,但 Spark 现在支持注册 UDF,以便可以从 SQL 中查询。
上面的UDF可以使用:
sqlContext.registerFunction("myDateFilter", myDateFilter)
如果表已注册
sqlContext.registerRDDAsTable(entries, "entries")
可以用
查询
sqlContext.sql("SELECT * FROM entries WHERE myDateFilter(when)")
更多详情请见this example。