【问题标题】:Processing concurrently in Scala在 Scala 中同时处理
【发布时间】:2009-06-17 14:26:00
【问题描述】:

作为in my own answer to my own question,我有一种情况,我正在处理大量到达队列的事件。每个事件都以完全相同的方式处理,甚至可以独立于所有其他事件进行处理。

我的程序利用了 Scala 并发框架,其中涉及的许多进程都被建模为Actors。由于Actors 按顺序处理他们的消息,因此它们不太适合这个特定问题(即使我的其他参与者正在执行顺序的操作)。因为我希望 Scala 能够“控制”所有线程创建(我认为这是它首先拥有并发系统的关键),所以我似乎有两个选择:

  1. 将事件发送到我控制的事件处理器池
  2. 让我的Actor 通过其他机制同时处理它们

我原以为#1 否定了使用actors 子系统的意义:我应该创建多少个处理器actor?这是一个显而易见的问题。据说这些东西对我来说是隐藏的,由子系统解决。

我的回答是:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

有没有更好的方法?这是不正确的吗?

编辑:一个可能更好的方法是:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}

【问题讨论】:

  • 虽然演员似乎不直接支持工人池,但这个 Q 有助于暴露这个缺陷。我可以使用的所有文档,请不要明确提及。

标签: scala concurrency actor


【解决方案1】:

这似乎是另一个问题的重复。所以我会复制我的答案

Actor 一次处理一条消息。处理多条消息的经典模式是在一组消费者参与者的前面设置一个协调参与者。如果你使用 react,那么消费者池可能会很大,但仍然只会使用少量的 JVM 线程。下面是一个示例,我创建了一个由 10 个消费者和一个协调员组成的池。

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

此代码测试查看哪个消费者可用并向该消费者发送请求。替代方案是随机分配给消费者或使用循环调度程序。

根据您正在做的事情,Scala 的 Futures 可能会为您提供更好的服务。例如,如果你真的不需要演员,那么上面所有的机器都可以写成

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))

【讨论】:

  • 谢谢 - 我不知道您可以在 Actor 框架使用的同一个调度程序上调用任务。因此,我认为最好的方法是使用 Scheduler.execute(process(e))
  • 也 - 是的;这是一个非常相似的问题(我链接到),但不完全相同。第一个问题是“演员是连续的吗?”而第二个问题是“由于演员是连续的,我该怎么做 X”
  • 顺便说一句:0 to 10 包含 11 个元素,而不是 10 个。
【解决方案2】:

如果事件都可以独立处理,为什么要排队?对您的设计一无所知,这似乎是一个不必要的步骤。如果您可以使用触发这些事件的任何内容来组合 process 函数,则您可能会避免队列。

演员本质上是配备队列的并发效果。如果你想同时处理多条消息,你真的不需要演员。您只希望在某个方便的时间安排一个函数 (Any => ()) 执行。

话虽如此,您的方法是合理的,如果您想留在演员库中并且事件队列不在您的控制范围内。

Scalaz 区分了 Actor 和并发效果。虽然它的Actor 非常轻巧,但scalaz.concurrent.Effect 仍然更轻。这是您的代码大致翻译到 Scalaz 库:

val eventProcessor = effect (x => process x)

这是最新的行李箱头,尚未发布。

【讨论】:

  • 谢谢!它们在“队列”上纯粹是因为我将它们发送给演员,而演员有一个队列,它按顺序处理。由于演员库是我 supposed 在 Scala 中处理并发 (*) 的方式,因此我正在尝试使用它。否则我只会使用 ExecutorService.invokeAll。
  • 另见我对上面 jschen 的评论。很长一段时间以来,我一直在用 Java 编写并发代码,并试图找到使用 actor 和,嗯,在预期是并发的 scala 程序中不使用 actor 之间的正确界限。
  • Actor 不是灵丹妙药,如果你想在 Scala 中实现并发,没有什么说你必须使用 Actor。它只是一个库,在我看来,是一个过于复杂的库。
【解决方案3】:

这听起来像是一个简单的消费者/生产者问题。我会使用一个带有消费者池的队列。您可能可以使用 java.util.concurrent 用几行代码来编写它。

【讨论】:

  • 使用 scala 演员库的全部意义在于它可以更好地将您的代码(使用演员编写)映射到当前操作环境中可用的并发性。因此,如果 Scala 认为它有 4 个处理器,那么它可能会为其具有 4 个工作人员的参与者创建一个支持线程池。通过创建自己的单独线程池来执行这项工作,我一无所获——我最终得到的只是一堆不必要的上下文切换。我完全知道如何在 Java 中解决这个问题 - 我正在询问如何使用 Scala 演员库解决它,因此是标签。
  • 对不起,我没有意识到这是一个演员的学术练习。我以为你想要一个很好的解决问题的方法。 “所以如果 Scala 认为它有 4 个处理器,也许它会为它的参与者创建一个有 4 个工作线程的后台线程池。”这可能是使用 java.util.concurrent 的两行代码,您可以从 scala 轻松使用。我一直从 jruby 使用它。
  • 是的,很好。但是当我将我的程序部署到一个 50 核的野兽上时,我已经将 2 个线程硬编码到其中。或者至少,它使actor子系统的工作方式完全没有意义,因为它现在不知道它可以创建多少线程。这不是一项学术练习,而是关于如何使用我不熟悉的技术最好地解决现实世界的问题。
  • 如果有一个非常好的解决方案你不感兴趣,因为你想尝试一些新想法,这是学术性的。这就是学术的定义!我从来没有建议你硬编码线程数。这是我目前正在研究的 jruby 程序的简化代码。 JavaConcurrent::Executors.newFixedThreadPool(JavaLang::Runtime.getRuntime.availableProcessors)
【解决方案4】:

actor(好吧,其中之一)的目的是确保actor内的状态一次只能被一个线程访问。如果消息的处理不依赖于actor中的任何可变状态,那么将任务提交给调度程序或线程池进行处理可能更合适。演员提供的额外抽象实际上妨碍了你。

在 scala.actors.Scheduler 中有方便的方法,或者您可以使用 java.util.concurrent 中的 Executor。

【讨论】:

    【解决方案5】:

    Actor 比线程轻得多,因此另一种选择是使用 Actor 对象,例如您习惯于提交到线程池的 Runnable 对象。主要区别在于您无需担心 ThreadPool - 线程池由 Actor 框架为您管理,主要是配置问题。

    def submit(e: MyEvent) = actor {
      // no loop - the actor exits immediately after processing the first message
      react {
        case MyEvent(x) =>
          process(x)
      }
    } ! e // immediately send the new actor a message
    

    然后提交消息,这样说:

    submit(new MyEvent(x))
    

    ,对应

    eventProcessor ! new MyEvent(x)
    

    从你的问题。

    通过在四核 i7 笔记本电脑上大约 10 秒内发送和接收 100 万条消息成功测试了此模式。

    希望这会有所帮助。

    【讨论】:

      猜你喜欢
      • 2019-11-05
      • 1970-01-01
      • 2010-11-01
      • 1970-01-01
      • 2010-11-03
      • 2014-07-15
      • 2017-06-12
      • 2017-05-14
      • 1970-01-01
      相关资源
      最近更新 更多