【发布时间】:2017-02-11 00:01:00
【问题描述】:
我有几个类型为 :RDD[(String, Int)] 的 RDD。我想根据键减去整数值。
这是一个例子:如果输入的 RDD 是
Valid_ record = (TcustomerTDL_2016266,16)
deleted_record = (TcustomerTDL_2016266,8)
由于键值相同,因此必须减去整数值。我尝试使用“SubtractByKey”,但它似乎不起作用。所以预期的结果是 (TcustomerTDL_2016266,8),即 16-8 = 8。`
我使用了以下代码:
val changes_total = valid_record.subtractByKey(deleted_record).
如果有其他方法可以做到这一点,或者这是否不正确,请告诉我。
代码如下:
val Conf = new SparkConf().setAppName("Module").setMaster("local")
val sc = new SparkContext(Conf)
val incoming_file =sc.wholeTextFiles("D:/Users/Documents/siva_hourly") //changed code
val output = incoming_file.map{case(k,v) => (k.split("/")(6),v.split("\\r?\\n"))}
output.cache()
val change_type = output.map{case (k,v) => (k,(v.toList.map( x => x.split("\001")(2))))} //changed code
val change_delete_count = change_type.map{case(k,v) => (k,(v.filter{ x => x == "D" }).length)}
val change_record_foreach4 = change_delete_count.map{case(k,v) => (k.split("_"),v)}
val change_record_foreach3 = change_record_foreach4.map{case(k,v)=>(k(0)+'_'+k(1),v)}
val change_valid_count = change_type.map{case(k,v) => (k,(v.filter{ x => x =="A" || x == "I"}).length)}
val change_record_foreach = change_valid_count.map{case(k,v) => (k.split("_"),v)}
val change_record_foreach1 = change_record_foreach.map{case(k,v)=>(k(0)+'_'+k(1),v)}
val valid_record = change_record_foreach1.reduceByKey((x, y) => x + y)
val deleted_record = change_record_foreach3.reduceByKey((x, y) => x + y)
val changes_total = valid_record.subtractByKey(deleted_record)
【问题讨论】:
标签: scala apache-spark