【发布时间】:2015-02-08 17:28:15
【问题描述】:
我的表中有以下列 [col1,col2,key1,col3,txn_id,dw_last_updated]。在这些 txn_id 中, key1 是主键列。在我的数据集中,我可以有多个记录用于(txn_id,key)的组合。在这些记录中,我需要根据 dw_last_updated 选择最新的一条..
我正在使用这个逻辑。我一直遇到内存问题,我相信部分原因是 groupByKey()... 有更好的替代方案吗?
case class Fact(col1: Int,
col2: Int,
key1: String,
col3: Int,
txn_id: Double,
dw_last_updated: Long)
sc.textFile(s3path).map { row =>
val parts = row.split("\t")
Fact(parts(0).toInt,
parts(1).toInt,
parts(2),
parts(3).toInt,
parts(4).toDouble,
parts(5).toLong)
}).map { t => ((t.txn_id, t.key1), t) }.groupByKey(512).map {
case ((txn_id, key1), sequence) =>
val newrecord = sequence.maxBy {
case Fact_Cp(col1, col2, key1, col3, txn_id, dw_last_updated) => dw_last_updated.toLong
}
(newrecord.col1 + "\t" + newrecord.col2 + "\t" + newrecord.key1 +
"\t" + newrecord.col3 + "\t" + newrecord.txn_id + "\t" + newrecord.dw_last_updated)
}
感谢您的想法/建议...
【问题讨论】:
-
恐怕我觉得不错。你可以尝试更多的分区。但也许你只是需要更多的机器。
-
这还没有解决吗?
标签: scala apache-spark