【发布时间】:2015-03-21 18:00:06
【问题描述】:
我有一个如下所示的数据集,其中每个用户和产品 ID 都是一个字符串:
userA, productX
userA, productX
userB, productY
拥有约 280 万种产品和 3 亿用户;大约 21 亿个用户-产品关联。
我的最终目标是在此数据集上运行 Spark 协同过滤 (ALS)。由于它需要用户和产品的 int 键,所以我的第一步是为每个用户和产品分配一个唯一的 int,并转换上面的数据集,以便用户和产品由 int 表示。
这是我迄今为止尝试过的:
val rawInputData = sc.textFile(params.inputPath)
.filter { line => !(line contains "\\N") }
.map { line =>
val parts = line.split("\t")
(parts(0), parts(1)) // user, product
}
// find all unique users and assign them IDs
val idx1map = rawInputData.map(_._1).distinct().zipWithUniqueId().cache()
// find all unique products and assign IDs
val idx2map = rawInputData.map(_._2).distinct().zipWithUniqueId().cache()
idx1map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx1Out)
idx2map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx2Out)
// join with user ID map:
// convert from (userStr, productStr) to (productStr, userIntId)
val rev = rawInputData.cogroup(idx1map).flatMap{
case (id1, (id2s, idx1s)) =>
val idx1 = idx1s.head
id2s.map { (_, idx1)
}
}
// join with product ID map:
// convert from (productStr, userIntId) to (userIntId, productIntId)
val converted = rev.cogroup(idx2map).flatMap{
case (id2, (idx1s, idx2s)) =>
val idx2 = idx2s.head
idx1s.map{ (_, idx2)
}
}
// save output
val convertedInts = converted.map{
case (a,b) => a.toInt.toString + "\t" + b.toInt.toString
}
convertedInts.saveAsTextFile(params.outputPath)
当我尝试在我的集群(40 个执行程序,每个执行程序,每个 5 GB RAM)上运行它时,它能够很好地生成 idx1map 和 idx2map 文件,但它会因内存不足错误而失败,并且在 cogroup 之后的第一个 flatMap 处获取失败.我以前对 Spark 做的不多,所以我想知道是否有更好的方法来实现这一点;我不知道这项工作的哪些步骤会很昂贵。当然,cogroup 需要在整个网络中对整个数据集进行洗牌;但是这样的事情是什么意思?
FetchFailed(BlockManagerId(25, ip-***.ec2.internal, 48690), shuffleId=2, mapId=87, reduceId=25)
我不只是使用散列函数的原因是我最终希望在更大的数据集上运行它(大约 10 亿个产品、10 亿个用户、350 亿个关联),以及Int 键冲突会变得非常大。在这种规模的数据集上运行 ALS 是否接近可行?
【问题讨论】:
标签: apache-spark