【问题标题】:Execute list of Futures in parallel并行执行期货清单
【发布时间】:2015-03-10 08:56:26
【问题描述】:

我对 Scala 的 Future 和 Akka 非常陌生,目前,我正在尝试实现一个应用程序,该应用程序执行一系列独立任务并将结果收集在一起。

例如,我想要一个由多个任务组成的应用程序,每个任务接收一个数字,休眠几秒钟,然后返回“Hello”消息。

actor实现如下:

class HelloActor extends Actor {
  def receive = {
    case name:Int => {
      println("%s will sleep for %s seconds".format(name, name % 4))
      Thread.sleep(name % 4 * 1000)
      sender ! "Hello %s".format(name)
    }
  }
}

主要对象实现为:

object HelloAkka extends App {
  val system = ActorSystem("HelloSystem")

  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")

  implicit val timeout = Timeout(20, TimeUnit.SECONDS)

  val futures = (1 to 10).map(num => {
    helloActor ? num
  })

  val future = Future.sequence(futures)

  val results = Await.result(future, timeout.duration)

  println(results)

  system.shutdown
}

由于每个任务将休眠 0、1、2 或 3 秒,我希望首先执行休眠时间较短的任务。但是,结果是:

1 will sleep for 1 seconds
2 will sleep for 2 seconds
3 will sleep for 3 seconds
4 will sleep for 0 seconds
5 will sleep for 1 seconds
6 will sleep for 2 seconds
7 will sleep for 3 seconds
8 will sleep for 0 seconds
9 will sleep for 1 seconds
10 will sleep for 2 seconds
Vector(Hello 1, Hello 2, Hello 3, Hello 4, Hello 5, Hello 6, Hello 7, Hello 8, Hello 9, Hello 10)

也就是说,所有的任务都是按顺序执行的。我想知道是否有任何方法可以让我并行执行所有任务。

【问题讨论】:

  • 您将所有请求发送给同一个参与者。来自/发往同一对参与者的消息保证按顺序执行。您可以将它们发送到 10 个不同的 actor 副本,也可以在 actor 内部的 Sleep 周围移动 Future 运算符。
  • @DiegoMartinoia 非常感谢,当我使用多个演员时它可以工作。您可以发表您的评论作为答案,以便我标记它吗?
  • 完成,感谢您的耐心等待!

标签: scala akka future


【解决方案1】:

如 cmets 中所述,您将所有任务/消息发送给一个参与者,并保证所有这些任务/消息将按顺序处理。

要并行处理任务,您需要有多个处理程序参与者实例,在您的情况下为HelloActor

当然,您可以只创建HelloActor 的多个实例,但这绝对不是好习惯。

对于此类任务,您应该使用内置路由功能,该功能允许您管理工作人员/处理程序池并通过一个 router 演员与他们进行交互,例如。

val router: ActorRef =
  context.actorOf(RoundRobinPool(10).props(Props[HelloActor]), "router")

...
router ? num
...

请关注Akka Routing 文档以获取更多详细信息。

【讨论】:

    【解决方案2】:

    我建议在Future 中执行实际任务,而不是像 cmets 和答案中建议的那样启动多个参与者。所以你的演员更像是任务的协调者。例如:

    //...    
    
    // import pipe pattern to get access to `pipeTo` method
    import akka.pattern.pipe
    import scala.concurrent.Future
    
    // the `Future`s will be executed on this dispatcher
    // depending on your needs, you may want to create a 
    // dedicated executor for this
    class TaskCoordinatorActor extends Actor {
      import context.dispatcher
    
      def receive = {
        case name: Int =>
          Future {
            Thread.sleep(name % 4 * 1000)
            "Hello %s".format(name)
          } pipeTo sender()
      }
    }
    

    以上代码在scala.concurrent.Future 中执行您的任务,并将结果通过管道传递给原始发送者。这样,actor 在任务完成之前不会阻塞,而是在创建 Future 后准备好接收下一条消息。

    P.S.:您应该创建消息类型,而不是发送纯整数,从而明确您希望参与者做什么。在您的情况下,例如:

    case class Sleep(duration: Duration)
    

    【讨论】:

      【解决方案3】:

      从同一个actor发送到同一个actor的消息将按顺序执行。

      你有两个选择。

      要么为每条消息创建一个新的 HelloActor 副本,以便它们都并行执行,要么将你的 HelloActor 修改为如下内容(可能是错误的导入,凭记忆):

      import akka.pattern.pipe._
      
      class HelloActor extends Actor {
        def receive = {
          case name:Int => {
            println("%s will sleep for %s seconds".format(name, name % 4))
            Future(sleepAndRespond(name)) pipeTo sender
         }
       }
      
       def sleepAndRespond(name:String) = {
         Thread.sleep(name % 4 * 1000)
         "Hello %s".format(innerName)
       }
      }
      

      这样,执行的顺序部分只是未来的管道,然后为十条消息中的每一条异步执行。

      【讨论】:

      • @drexin 关于使用路由器的建议是一个非常好的建议,如果您采用多选方式。另请注意,在这两种情况下,您的性​​能都严格取决于您的线程池/调度程序的配置方式。查看文档了解更多详情!
      猜你喜欢
      • 1970-01-01
      • 2013-06-29
      • 2015-08-25
      • 2017-07-09
      • 2016-07-11
      • 2016-11-06
      • 2014-09-23
      • 2018-08-04
      • 2018-06-13
      相关资源
      最近更新 更多