【发布时间】:2017-05-02 19:24:44
【问题描述】:
我是 Scala 和 Spark 的新手,希望得到一些帮助来理解为什么下面的代码没有产生我想要的结果。
我在比较两张表
我想要的输出模式是:
case class DiscrepancyData(fieldKey:String, fieldName:String, val1:String, val2:String, valExpected:String)
当我手动逐步运行以下代码时,我实际上最终得到了我想要的结果。这是一个完全填充了我想要的输出的List[DiscrepancyData]。但是,我必须在下面的代码中遗漏一些东西,因为它返回一个空列表(在调用此代码之前,还有其他代码涉及从 HIVE 读取表、映射、分组、过滤等):
val compareCols = Set(year, nominal, adjusted_for_inflation, average_private_nonsupervisory_wage)
val key = "year"
def compare(table:RDD[(String, Iterable[Row])]): List[DiscrepancyData] = {
var discs: ListBuffer[DiscrepancyData] = ListBuffer()
def compareFields(fieldOne:String, fieldTwo:String, colName:String, row1:Row, row2:Row): DiscrepancyData = {
if (fieldOne != fieldTwo){
DiscrepancyData(
row1.getAs(key).toString, //fieldKey
colName, //fieldName
row1.getAs(colName).toString, //table1Value
row2.getAs(colName).toString, //table2Value
row2.getAs(colName).toString) //expectedValue
}
else null
}
def comparison() {
for(row <- table){
var elem1 = row._2.head //gets the first element in the iterable
var elem2 = row._2.tail.head //gets the second element in the iterable
for(col <- compareCols){
var value1 = elem1.getAs(col).toString
var value2 = elem2.getAs(col).toString
var disc = compareFields(value1, value2, col, elem1, elem2)
if (disc != null) discs += disc
}
}
}
comparison()
discs.toList
}
我这样调用上面的函数:
var outcome = compare(groupedFiltered)
这是groupedFiltered中的数据:
(1991,CompactBuffer([1991,7.14,5.72,39%], [1991,4.14,5.72,39%]))
(1997,CompactBuffer([1997,4.88,5.86,39%], [1997,3.88,5.86,39%]))
(1999,CompactBuffer([1999,5.15,5.96,39%], [1999,5.15,5.97,38%]))
(1947,CompactBuffer([1947,0.9,2.94,35%], [1947,0.4,2.94,35%]))
(1980,CompactBuffer([1980,3.1,6.88,45%], [1980,3.1,6.88,48%]))
(1981,CompactBuffer([1981,3.15,6.8,45%], [1981,3.35,6.8,45%]))
groupedFiltered 的表架构:
(year String,
nominal Double,
adjusted_for_inflation Double,
average_provate_nonsupervisory_wage String)
【问题讨论】:
标签: scala apache-spark bigdata