【发布时间】:2016-02-12 10:16:11
【问题描述】:
下面的代码包含reduceByKeyXXX 方法的各种单线程实现以及一些用于创建输入集和测量执行时间的辅助方法。 (随意运行main-方法)
reduceByKey(和 Spark 一样)的主要目的是减少具有相同键的键值对。示例:
scala> val xs = Seq( "a" -> 2, "b" -> 3, "a" -> 5)
xs: Seq[(String, Int)] = List((a,2), (b,3), (a,5))
scala> ReduceByKeyComparison.reduceByKey(xs, (x:Int, y:Int) ⇒ x+y )
res8: Seq[(String, Int)] = ArrayBuffer((b,3), (a,7))
代码
import java.util.HashMap
object Util {
def measure( body : => Unit ) : Long = {
val now = System.currentTimeMillis
body
val nowAfter = System.currentTimeMillis
nowAfter - now
}
def measureMultiple( body: => Unit, n: Int) : String = {
val executionTimes = (1 to n).toList.map( x => {
print(".")
measure(body)
} )
val avg = executionTimes.sum / executionTimes.size
executionTimes.mkString("", "ms, ", "ms") + s" Average: ${avg}ms."
}
}
object RandomUtil {
val AB = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
val r = new java.util.Random();
def randomString( len: Int ) : String = {
val sb = new StringBuilder( len );
for( i <- 0 to len-1 ) {
sb.append(AB.charAt(r.nextInt(AB.length())));
}
sb.toString();
}
def generateSeq(n: Int) : Seq[(String, Int)] = {
Seq.fill(n)( (randomString(2), r.nextInt(100)) )
}
}
object ReduceByKeyComparison {
def main(args: Array[String]) : Unit = {
implicit def iterableToPairedIterable[K, V](x: Iterable[(K, V)]) = { new PairedIterable(x) }
val runs = 10
val problemSize = 2000000
val ss = RandomUtil.generateSeq(problemSize)
println("ReduceByKey : " + Util.measureMultiple( reduceByKey(ss, (x:Int, y:Int) ⇒ x+y ), runs ))
println("ReduceByKey2: " + Util.measureMultiple( reduceByKey2(ss, (x:Int, y:Int) ⇒ x+y ), runs ))
println("ReduceByKey3: " + Util.measureMultiple( reduceByKey3(ss, (x:Int, y:Int) ⇒ x+y ), runs ))
println("ReduceByKeyPaired: " + Util.measureMultiple( ss.reduceByKey( (x:Int, y:Int) ⇒ x+y ), runs ))
println("ReduceByKeyA: " + Util.measureMultiple( reduceByKeyA( ss, (x:Int, y:Int) ⇒ x+y ), runs ))
}
// =============================================================================
// Different implementations
// =============================================================================
def reduceByKey[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B) : Seq[(A,B)] = {
val t = s.groupBy(x => x._1)
val u = t.map { case (k,v) => (k, v.map(_._2).reduce(fnc))}
u.toSeq
}
def reduceByKey2[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B) : Seq[(A,B)] = {
val r = s.foldLeft( Map[A,B]() ){ (m,a) ⇒
val k = a._1
val v = a._2
m.get(k) match {
case Some(pv) ⇒ m + ((k, fnc(pv, v)))
case None ⇒ m + ((k, v))
}
}
r.toSeq
}
def reduceByKey3[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B) : Seq[(A,B)] = {
var m = scala.collection.mutable.Map[A,B]()
s.foreach{ e ⇒
val k = e._1
val v = e._2
m.get(k) match {
case Some(pv) ⇒ m(k) = fnc(pv, v)
case None ⇒ m(k) = v
}
}
m.toSeq
}
/**
* Method code from [[http://ideone.com/dyrkYM]]
* All rights to Muhammad-Ali A'rabi according to [[https://issues.scala-lang.org/browse/SI-9064]]
*/
def reduceByKeyA[A,B]( s: Seq[(A,B)], fnc: (B, B) ⇒ B): Map[A, B] = {
s.groupBy(_._1).map(l => (l._1, l._2.map(_._2).reduce( fnc )))
}
/**
* Method code from [[http://ideone.com/dyrkYM]]
* All rights to Muhammad-Ali A'rabi according to [[https://issues.scala-lang.org/browse/SI-9064]]
*/
class PairedIterable[K, V](x: Iterable[(K, V)]) {
def reduceByKey(func: (V,V) => V) = {
val map = new HashMap[K, V]
x.foreach { pair =>
val old = map.get(pair._1)
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
map
}
}
}
在我的机器上产生以下结果
..........ReduceByKey : 723ms, 782ms, 761ms, 617ms, 640ms, 707ms, 634ms, 611ms, 380ms, 458ms Average: 631ms.
..........ReduceByKey2: 580ms, 458ms, 452ms, 463ms, 462ms, 470ms, 463ms, 465ms, 458ms, 462ms Average: 473ms.
..........ReduceByKey3: 489ms, 466ms, 461ms, 468ms, 555ms, 474ms, 469ms, 457ms, 461ms, 468ms Average: 476ms.
..........ReduceByKeyPaired: 140ms, 124ms, 124ms, 120ms, 122ms, 124ms, 118ms, 126ms, 121ms, 119ms Average: 123ms.
..........ReduceByKeyA: 628ms, 694ms, 666ms, 656ms, 616ms, 660ms, 594ms, 659ms, 445ms, 399ms Average: 601ms.
和 ReduceByKeyPaired 目前是最快的。
问题/任务
是否有更快的单线程 (Scala) 实现?
【问题讨论】:
-
不同意,我的问题既不是“代码高尔夫”也不是“谜题”。其次,游客密度较低。
-
我在这里引用 wiki “代码高尔夫是一种娱乐性的计算机编程竞赛,参赛者力争实现实现某种算法的最短源代码(不要与二进制大小编码混淆)。”那不是我要找的......我正在寻找最快的实现。
-
好吧,三个人意见一致……
-
“第二,访问者密度较低” 这不是选择网站发帖的好理由。或者我会在这里发布关于骑自行车的问题:) 但是,我也不认为它适合 codegolf。不过,可能值得进行代码审查。
-
伙计们,他要求更快的实施。这与 Code Golf 无关。
标签: scala mapreduce benchmarking key-value reduce