【问题标题】:Multi-threading for Scala stream production and processingScala 流生产和处理的多线程
【发布时间】:2014-12-11 08:50:52
【问题描述】:

假设我有一个相当标准的生产者/消费者问题要在 Scala 中使用这种结构进行编码:

  1. 构造延迟生成元素的StreamIterator
  2. StreamIterator 上使用mapforeach 来处理这些元素并对其进行处理。

这似乎工作得很好,但它看起来像是单线程的:当我们想要处理一个新元素时,我们要求生成它,并且在它生成之后,然后我们开始处理它。我真正想要的是一种在处理前一个元素时继续生成的机制。有没有办法让 Scala 做到这一点?

我知道我可以使用BlockingQueue,但这对我来说似乎非常必要。我希望有办法让Stream 继续在另一个线程上生成元素。

一旦我们提前生成它们,当然就不再是惰性求值了。但我也不想要预先生成整个流的热切评估。我想要 BlockingQueue 的类似物,但在功能范式中。

【问题讨论】:

  • 你应该看看反应流。见reactive-streams.org。使用 scala 和 akka 的实现刚刚达到 1.0 里程碑 1 状态。 groups.google.com/d/msg/akka-user/PPleJEfI5sM/EpSGOK2Pah4J。每个流处理阶段都依赖于自己的参与者,因此您应该能够获得非常好的并发性。从长远来看,这甚至应该允许您将流处理管道分布在多台机器上。
  • 有多种基于迭代的方法允许双方适当地异步。我认为 scalaz-stream 是目前在这个方向上的最大努力。 “连续”过程允许完全反应(尽管我没有直接使用它们);我所做的是通过将完全纯的Tasks 与unsafeStart 的明智使用结合起来“伪造”。
  • 我相信scalaz-stream 最接近您正在寻找的内容。 Akka 流还为 Akka Actor 提供了一个功能性包装器,用于流处理。两者最大的区别在于 Akka 流支持背压。

标签: multithreading scala stream producer-consumer


【解决方案1】:

您可以将流中的项目映射到这样的处理的未来:

def process(x: Int): Int = // do something time consuming
val asyncProducer = Stream.from(0).map(x => future { process(x)})

现在这不会产生任何结果,因为 Stream 在您尝试具体化它们之前不会生成项目,就像您建议您的流工作一样。因此,如果您想启动接下来 10 个项目的处理,您可以像这样简单地实现它们:

val futureResults = asyncProducer.take(10).toList

这将启动 10 个并行进程(取决于您在范围内的 ExecutionContext)并生成一个List[Future[Int]]。为了能够接收所有这些工作项,您可以将未来列表排序为列表的未来:

val futureResult = Future.sequence(futureResults)

现在这个未来,您可以映射以获取结果列表并将它们交给某个接收者并开始下一个处理块。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-03-17
    • 2013-06-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多