【发布时间】:2016-12-01 01:59:18
【问题描述】:
我有一个函数我想调用 100 次,但我想以批处理的方式执行它,以便在任何时候只运行 2 个函数。这是因为该函数可能会对 Internet 连接造成高负载,因此最好以 2 个为一组对函数进行批处理。
这是我尝试使用 Scala Futures 进行的尝试,但似乎不起作用。有没有使用 Scala Futures 批处理任务列表的标准方法?
def futureString(s:String): String = {
Thread.sleep(2000)// + (Math.random()*1000).toInt)
println(s"Completed $s")
"end:" + s
}
def processList(list: List[String], blockSize: Int) = {
var futuresProcessing = Set[Future[String]]()
async {
val itemIterator = list.iterator
while (itemIterator.hasNext) {
val item = itemIterator.next()
println("Item is " + item)
if (futuresProcessing.size >= blockSize) {
await {
val completed = Future.firstCompletedOf(futuresProcessing.toSeq)
println("Size : " + futuresProcessing.size)
completed
}
}
val f = future { futureString(item) }
f.onComplete{ case Success(sss) => { futuresProcessing = futuresProcessing - f } }
futuresProcessing = futuresProcessing + f
}
}
}
val list: List[String] = (1 to 200).map(n => "" + n).toList
processList(list, 2)
我想要的是我可以批量处理任何批量大小,futureString 可能会在随机的时间内完成。所以假设批量大小为 10,然后开始前 10 个项目,当一个项目完成时,应该将一个新项目添加到批次中进行处理。
我开始认为我应该使用演员。
更新:经过长时间的睡眠和清醒后,我开始使用它,但我认为使用 Actors 会更好。此外,我认为以下代码和 futuresProcessing Set 的使用存在一些竞争条件问题。
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import scala.async.Async.{async, await}
import scala.collection.parallel.mutable
import scala.util.{Success, Try}
import scala.concurrent.Await
def futureString(s:String): Future[String] = {
future {
Thread.sleep(2000 + (Math.random()*1000).toInt)
println(s"Completed $s")
"end:" + s
}
}
def processList(list: List[String], blockSize: Int) = {
val futuresProcessing = mutable.ParSet[Future[String]]()
async {
val itemIterator = list.iterator
while (itemIterator.hasNext) {
val item = itemIterator.next()
println("Item is " + item)
if (futuresProcessing.size >= blockSize) {
await {
val completed = Future.firstCompletedOf(futuresProcessing.toList)
println("Size : " + futuresProcessing.size)
completed
}
}
val f = futureString(item)
futuresProcessing += f
f.onComplete{ case Success(sss) => { futuresProcessing -= f } }
}
}
}
val list: List[String] = (1 to 200).map(n => "" + n).toList
processList(list, 4)
【问题讨论】:
-
您介意使用 Java 线程和并发特性吗?在 IMO 那里实际上更容易。