我不确定您的 JSON 定义(它还缺少一些引号和花括号),以及记录类型是否是 CSV 中的列,但这里有一个简化——您可以在它周围添加“记录类型”逻辑如果需要。
假设一个文件validator.json:
{
"fields" : [
{
"name" : "val1",
"regex": "[0-9]+"
},{
"name" : "val2",
"regex" :"[0-9]+"
},{
"name" : "val3",
"regex" :"[A-Z]{2}"
}
]
}
通常,默认情况下(没有关于架构的额外选项)spark.read.format("csv").option("header", "true").load("file.csv") 将为文件中的所有列使用字符串。在这里,假设您有一个标题val1,val2,val3,作为 CSV 的第一行。一个等效定义的 DF 内联:
val df = Seq(("1", "2", "ZZ"), ("2", "555", "KK")).toDF("val1", "val2", "val3")
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.databind.ObjectMapper
import scala.io.Source
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
// read the validator as one long string
val jsonString = Source.fromFile("validator.json").getLines.mkString("")
// map the json string into an object (nested map)
val regexMap:Map[String,Seq[Map[String,String]]] = mapper.readValue(jsonString, classOf[Map[String, Seq[Map[String, String]]]])
//val1 rlike '[0-9]+' AND val2 rlike '[0-9]+' AND val3 rlike '[A-Z]{2}'
val exprStr:String = regexMap("fields").map((fieldDef:Map[String, String]) => s"${fieldDef("name")} rlike '${fieldDef("regex")}'").mkString(" AND ")
// this asks whether all rows match
val matchingRowCount:Long = df.filter(expr("val1 rlike '[0-9]+' AND val2 rlike '[0-9]+' AND val3 rlike '[A-Z][A-Z]'")).count
// if the counts match, then all of the rows follow the rules
df.count == matchingRowCount
// this adds a column about whether the row matches
df.withColumn("matches",expr(exprStr)).show
结果:
+----+----+----+-------+
|val1|val2|val3|matches|
+----+----+----+-------+
| 1| 2| ZZ| true|
| 2| 555| KK| true|
+----+----+----+-------+