【发布时间】:2014-12-11 08:50:52
【问题描述】:
假设我有一个相当标准的生产者/消费者问题要在 Scala 中使用这种结构进行编码:
- 构造延迟生成元素的
Stream或Iterator。 - 在
Stream或Iterator上使用map或foreach来处理这些元素并对其进行处理。
这似乎工作得很好,但它看起来像是单线程的:当我们想要处理一个新元素时,我们要求生成它,并且在它生成之后,然后我们开始处理它。我真正想要的是一种在处理前一个元素时继续生成的机制。有没有办法让 Scala 做到这一点?
我知道我可以使用BlockingQueue,但这对我来说似乎非常必要。我希望有办法让Stream 继续在另一个线程上生成元素。
一旦我们提前生成它们,当然就不再是惰性求值了。但我也不想要预先生成整个流的热切评估。我想要 BlockingQueue 的类似物,但在功能范式中。
【问题讨论】:
-
你应该看看反应流。见reactive-streams.org。使用 scala 和 akka 的实现刚刚达到 1.0 里程碑 1 状态。 groups.google.com/d/msg/akka-user/PPleJEfI5sM/EpSGOK2Pah4J。每个流处理阶段都依赖于自己的参与者,因此您应该能够获得非常好的并发性。从长远来看,这甚至应该允许您将流处理管道分布在多台机器上。
-
有多种基于迭代的方法允许双方适当地异步。我认为 scalaz-stream 是目前在这个方向上的最大努力。 “连续”过程允许完全反应(尽管我没有直接使用它们);我所做的是通过将完全纯的
Tasks 与unsafeStart的明智使用结合起来“伪造”。 -
我相信
scalaz-stream最接近您正在寻找的内容。 Akka 流还为 Akka Actor 提供了一个功能性包装器,用于流处理。两者最大的区别在于 Akka 流支持背压。
标签: multithreading scala stream producer-consumer