组合 IO 库 scalaz-stream 在此类用例中成功地使您的代码远离可变性。一、依赖关系:
libraryDependencies ++= Seq(
"org.scalaz" %% "scalaz-core" % "7.2.8",
"org.scalaz" %% "scalaz-concurrent" % "7.2.8",
"org.scalaz.stream" %% "scalaz-stream" % "0.8.6a"
)
我们从 scalaz-concurrent 和 scalaz-stream 的一些导入开始:
import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.duration._
import scalaz.concurrent.Task
import scalaz.stream.time._
import scalaz.stream._
import scalaz.stream.Process.Env
import scalaz.stream.ReceiveY._
假设我们有一个能够提取其快照的命令式数据源。为了演示,它还会在每次提取时自行更新:
trait DataSource[Key, Value] {
def loadMap: Map[Key, Value]
}
object DataSourceStub extends DataSource[Int, String] {
private var externalSource: Map[Int, String] = Map(1 -> "a")
def loadMap: Map[Int, String] = {
val snapshot = externalSource
val key = snapshot.keys.max
val value = snapshot(key)
val (newKey, newValue) = (key + 1) -> (value + "a")
val newSource = snapshot + (newKey -> newValue)
externalSource = newSource
snapshot
}
}
现在,我们通过引入timer 来启动Loader 实现,它在启动时立即发出一个单元事件,然后每refreshEvery 秒发出一次。然后,可以通过将数据接收Task 映射到每个事件并在流中评估它们来获得报告我们的cacheStates 的事件流。困难的部分是交换:我们需要将Requests 流(使用缓存中的数据执行某些操作的函数)与我们的定期快照流交错。 scalaz-stream 提供了一个流交换工具,Wye,它允许我们说明我们将从输入流中处理事件的顺序。我们需要使用初始缓存快照,因此我们从wye.receiveL 开始,移动到具有初始缓存状态的handleImpl。现在,我们可以使用receiveBoth 接收任何事件:
- 如果是缓存快照更新,我们会对其进行重复而不产生输出;
- 如果它是一个请求,我们会为其提供当前缓存状态并将结果
Task 发送到输出中,同时在当前状态下重复出现;
- 如果输入流之一终止,我们将停止处理。
剩下的唯一事情是将我们的输入与handleWye 连接起来,并将处理任务的副作用包含在流中,我们在processRequests 中这样做。
class Loader[Key, Value](dataSource: DataSource[Key, Value], refreshEvery: Duration) {
type CacheState = Map[Key, Value]
type Request = CacheState => Task[Unit]
type ReaderEnv = Env[CacheState, Request]
implicit val scheduler: ScheduledExecutorService = DefaultScheduler
private val timer: Process[Task, Unit] =
Process.emit(()) ++ awakeEvery(refreshEvery).map(_ => ())
private val cacheStates: Process[Task, CacheState] =
timer.evalMap(_ => Task(dataSource.loadMap))
private val handle: Wye[CacheState, Request, Task[Unit]] = {
def handleImpl(current: CacheState): Wye[CacheState, Request, Task[Unit]] = {
import wye._
import Process._
receiveBoth {
case ReceiveL(i) => handleImpl(i)
case ReceiveR(i) => emit(i(current)) ++ handleImpl(current)
case HaltOne(rsn) => Halt(rsn)
}
}
wye.receiveL[CacheState, Request, Task[Unit]](handleImpl)
}
def processRequests(requests: Process[Task, Request]): Process[Task, Unit] =
cacheStates.wye(requests)(handle).eval
}
让我们通过向它发出 100 个最大 id 的数据请求(每个 100 毫秒)来测试我们的数据加载器,同时每秒执行一次刷新:
object TestStreamBatching {
private val loader = new Loader(DataSourceStub, 1.second)
private def request(cache: loader.CacheState): Task[Unit] = Task {
Thread.sleep(100)
val key = cache.keys.max
val value = cache(key)
println(value)
}
private val requests: Process[Task, loader.Request] =
Process.unfold(100)(s => if(s > 0) Some((request, s - 1)) else None)
def main(args: Array[String]): Unit = {
loader.processRequests(requests).run.unsafePerformSync
}
}
通过运行它,您可以看到一个由'a' 字母组成的阶梯,每秒钟增加它的竖线大小,最终在输出 100 次后终止。