【问题标题】:How to batch process functions with Scala Future's?如何使用 Scala Future 批处理函数?
【发布时间】:2016-12-01 01:59:18
【问题描述】:

我有一个函数我想调用 100 次,但我想以批处理的方式执行它,以便在任何时候只运行 2 个函数。这是因为该函数可能会对 Internet 连接造成高负载,因此最好以 2 个为一组对函数进行批处理。

这是我尝试使用 Scala Futures 进行的尝试,但似乎不起作用。有没有使用 Scala Futures 批处理任务列表的标准方法?

  def futureString(s:String): String = {
    Thread.sleep(2000)// + (Math.random()*1000).toInt)
    println(s"Completed $s")
    "end:" + s
  }

 def processList(list: List[String], blockSize: Int) = {
    var futuresProcessing = Set[Future[String]]()
    async {
      val itemIterator = list.iterator
      while (itemIterator.hasNext) {
        val item = itemIterator.next()
        println("Item is " + item)

        if (futuresProcessing.size >= blockSize) {
          await {
            val completed = Future.firstCompletedOf(futuresProcessing.toSeq)
            println("Size : " + futuresProcessing.size)
            completed
          }
        }

        val f = future { futureString(item) }
        f.onComplete{ case Success(sss) => { futuresProcessing = futuresProcessing - f } }
        futuresProcessing = futuresProcessing + f
      }
    }
  }

  val list: List[String] = (1 to 200).map(n => "" + n).toList
  processList(list, 2)

我想要的是我可以批量处理任何批量大小,futureString 可能会在随机的时间内完成。所以假设批量大小为 10,然后开始前 10 个项目,当一个项目完成时,应该将一个新项目添加到批次中进行处理。

我开始认为我应该使用演员。

更新:经过长时间的睡眠和清醒后,我开始使用它,但我认为使用 Actors 会更好。此外,我认为以下代码和 futuresProcessing Set 的使用存在一些竞争条件问题。

  import scala.concurrent._
  import scala.concurrent.duration._
  import ExecutionContext.Implicits.global
  import scala.async.Async.{async, await}
  import scala.collection.parallel.mutable
  import scala.util.{Success, Try}
  import scala.concurrent.Await

  def futureString(s:String): Future[String] = {
    future {
    Thread.sleep(2000 + (Math.random()*1000).toInt)
    println(s"Completed $s")
    "end:" + s
    }
  }

  def processList(list: List[String], blockSize: Int) = {
    val futuresProcessing = mutable.ParSet[Future[String]]()
    async {
      val itemIterator = list.iterator
      while (itemIterator.hasNext) {
        val item = itemIterator.next()
        println("Item is " + item)

        if (futuresProcessing.size >= blockSize) {
          await {
            val completed = Future.firstCompletedOf(futuresProcessing.toList)
            println("Size : " + futuresProcessing.size)
            completed
          }
        }

        val f = futureString(item)
        futuresProcessing += f
        f.onComplete{ case Success(sss) => { futuresProcessing -= f } }
      }
    }
  }

val list: List[String] = (1 to 200).map(n => "" + n).toList
processList(list, 4)

【问题讨论】:

  • 您介意使用 Java 线程和并发特性吗?在 IMO 那里实际上更容易。

标签: scala future


【解决方案1】:

如果您只关心并行处理 2 个批次,那么可能有一个更简单的解决方案:

val data = (1 to 20).map(_.toString()).grouped(2).toList

然后:

val result = data.flatMap(pair => pair.par.map(futureString))

产量:

// pause
Completed 1
Completed 2
// pause
Completed 4
Completed 3
// pause
Completed 6
Completed 5
// pause
Completed 8
Completed 7
// pause
Completed 9
Completed 10
// pause
// ..etc

result: List[String] = List(end:1, end:2, end:3, end:4, end:5, end:6, end:7, end
:8, end:9, end:10, end:11, end:12, end:13, end:14, end:15, end:16, end:17, end:1
8, end:19, end:20)

如果你想让它异步完成(因为上面的版本会阻塞),你可以将整个结果处理包装在一个 Future 中并等待它。

【讨论】:

  • 用 2 进行批处理是一个很好的答案。我只是希望有人可以回答更大的批量 N 情况。
  • @Phil 如果您想并行处理最多 3 个项目,则必须执行 grouped(3) 等。只要所有“组”必须相等,这不会不是问题。如果您想要拥有不同大小的组,这将是一个问题,但这是正确划分集合的问题......
  • 我不认为 OP 真正的意思是“两个批次”,一个线程空闲而第二个作业完成,但是“两个作业一直在进行中”(其中进行中意味着阻塞i/o)。
  • @som-snytt 哦,好吧...这是有道理的,可能就是这样。我猜我对措辞有点困惑。
【解决方案2】:

为了方便起见,让池定义并行度。

scala> import concurrent._
import concurrent._

scala> implicit val x = ExecutionContext fromExecutorService new java.util.concurrent.ForkJoinPool(2)
x: scala.concurrent.ExecutionContextExecutorService = scala.concurrent.impl.ExecutionContextImpl$$anon$1@6c9b0123

scala> 1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
Start 1
Start 2
res1: scala.collection.immutable.IndexedSeq[scala.concurrent.Future[String]] = Vector(scala.concurrent.impl.Promise$DefaultPromise@397314a4, scala.concurrent.impl.Promise$DefaultPromise@32503873, scala.concurrent.impl.Promise$DefaultPromise@30aa1fd3, scala.concurrent.impl.Promise$DefaultPromise@710f6e9a, scala.concurrent.impl.Promise$DefaultPromise@2c267a73, scala.concurrent.impl.Promise$DefaultPromise@12312aaa, scala.concurrent.impl.Promise$DefaultPromise@59e8083a, scala.concurrent.impl.Promise$DefaultPromise@107445f3, scala.concurrent.impl.Promise$DefaultPromise@419c5cf5, scala.concurrent.impl.Promise$DefaultPromise@9afa7a, scala.concurrent.impl.Promise$DefaultPromise@3eb25fe5, scala.concurrent.impl.Promise$DefaultPromise@30b5d38b, scala.concurrent.impl.Promise$DefaultPromise@715363a8...
scala> End 1
Start 3
End 2
Start 4
End 3
Start 5

(等到 20)

在哪里

scala> val ran = new java.util.Random()
ran: java.util.Random = java.util.Random@2ee5d440

scala> def delay() = Thread sleep (1000L + (ran nextInt 10000))
delay: ()Unit

编辑:默认ExecutionContext.global 池的配置记录在overview 中。它将使用系统属性scala.concurrent.context.maxThreadsminThreadsnumThreads 来约束可用线程。

还值得一提的是,希望对异步代码运行时间进行额外控制的人更喜欢多个库中提供的 Task 抽象。

$ scala -Dscala.concurrent.context.maxThreads=2
Welcome to Scala 2.12.0 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111).
Type in expressions for evaluation. Or try :help.

scala> import concurrent._, ExecutionContext.Implicits.global
import concurrent._
import ExecutionContext.Implicits.global

scala> 1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
<console>:16: error: not found: value delay
       1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
                                                        ^

scala>  val ran = new java.util.Random()
ran: java.util.Random = java.util.Random@782be4eb

scala> def delay() = Thread sleep (1000L + (ran nextInt 10000))
delay: ()Unit

scala> 1 to 20 map (i => Future { println(s"Start $i"); delay; println(s"End $i"); i.toString })
Start 1
Start 2
res1: scala.collection.immutable.IndexedSeq[scala.concurrent.Future[String]] = Vector(Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>), Future(<not completed>))

scala> End 1
Start 3
End 2
Start 4
End 4
Start 5
End 5
Start 6
End 3
Start 7
End 7

【讨论】:

  • 如果对此处起作用的元素(例如 ForkJoinPool(2))进行更详细的解释会很有帮助。
  • @ThomasS 这是特定于池的;我会向读者提出有关他们选择的问题。但我很好奇默认池配置记录在哪里,所以我将添加该链接。
猜你喜欢
  • 2017-05-14
  • 2016-01-23
  • 2015-12-28
  • 1970-01-01
  • 2020-07-16
  • 2017-02-27
  • 2010-11-01
  • 1970-01-01
  • 2018-06-15
相关资源
最近更新 更多