【问题标题】:Way to fetch data periodically in Scala without var/mutable collection在没有 var/mutable 集合的情况下在 Scala 中定期获取数据的方法
【发布时间】:2017-02-27 14:16:40
【问题描述】:

我想定期从某个来源获取数据,每小时一次。我这样做是因为获取数据需要花费大量时间,大约 10 分钟。所以,我缓存了这些数据。

我现在有这样的代码:

import java.util._

object Loader {
    @volatile private var map: Map[SomeKey, SomeValue] = Map()

    def start() {
        val timer = new Timer()
        val timerTask = new TimerTask {
            override def run() {
                reload()
            }
        }
        val oneHour = 60 * 60 * 1000
        timer.schedule(timerTask, oneHour)
    }

    def reload() {
        map = loadMap()
    }

    // this method invocation costs a lot, so, I cache it in reload()
    def loadMap(): Map[SomeKey, SomeValue] = ...

    def getValue(key: SomeKey): Option[SomeValue] = map.get(key)
}

另外,我的main() 函数中有Loader.start() 调用。

这很好用,但我想知道,有没有办法以更实用的方式编写它:摆脱 var 而不只使用可变集合?

【问题讨论】:

  • 如果您使用(易失性)varMap 不必是可变的(顺便说一句,上面的代码中似乎就是这种情况)
  • @BrunoGrieder 是的,Map 不必是可变的,它不是。问题是,我们能否在不使用可变映射的情况下以某种方式摆脱这个 var。我编辑了我的问题以便更清楚。
  • 如果你想改变状态并维护它,一些东西必须是可变的。也许您可以通过将 State 封装在 Monad Transformer 中并“携带”该状态来摆脱困境,但我什至无法想象这会是什么样子。 ScalaZ 专家可能会跳到这里
  • 对象(缓存)的状态会定期变化。使用可变结构(varmutable 集合)来表示这一点并没有错。甚至 Scala 库在其实现中也使用可变元素。要避免的是共享可变状态,但是本地的、非共享的可变状态很好。我知道这不是一个答案,但这个问题似乎比学术更实际,所以我觉得指出在这种情况下没有理由避免使用可变变量是恰当的。

标签: scala functional-programming scalaz


【解决方案1】:

组合 IO 库 scalaz-stream 在此类用例中成功地使您的代码远离可变性。一、依赖关系:

libraryDependencies ++= Seq(
  "org.scalaz" %% "scalaz-core" % "7.2.8",
  "org.scalaz" %% "scalaz-concurrent" % "7.2.8",
  "org.scalaz.stream" %% "scalaz-stream" % "0.8.6a"
)

我们从 scalaz-concurrentscalaz-stream 的一些导入开始:

import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.duration._
import scalaz.concurrent.Task
import scalaz.stream.time._
import scalaz.stream._
import scalaz.stream.Process.Env
import scalaz.stream.ReceiveY._

假设我们有一个能够提取其快照的命令式数据源。为了演示,它还会在每次提取时自行更新:

trait DataSource[Key, Value] {

  def loadMap: Map[Key, Value]
}

object DataSourceStub extends DataSource[Int, String] {

  private var externalSource: Map[Int, String]  = Map(1 -> "a")

  def loadMap: Map[Int, String] = {
    val snapshot = externalSource
    val key = snapshot.keys.max
    val value = snapshot(key)
    val (newKey, newValue) = (key + 1) -> (value + "a")
    val newSource = snapshot + (newKey -> newValue)
    externalSource = newSource
    snapshot
  }
}

现在,我们通过引入timer 来启动Loader 实现,它在启动时立即发出一个单元事件,然后每refreshEvery 秒发出一次。然后,可以通过将数据接收Task 映射到每个事件并在流中评估它们来获得报告我们的cacheStates 的事件流。困难的部分是交换:我们需要将Requests 流(使用缓存中的数据执行某些操作的函数)与我们的定期快照流交错。 scalaz-stream 提供了一个流交换工具,Wye,它允许我们说明我们将从输入流中处理事件的顺序。我们需要使用初始缓存快照,因此我们从wye.receiveL 开始,移动到具有初始缓存状态的handleImpl。现在,我们可以使用receiveBoth 接收任何事件:

  • 如果是缓存快照更新,我们会对其进行重复而不产生输出;
  • 如果它是一个请求,我们会为其提供当前缓存状态并将结果 Task 发送到输出中,同时在当前状态下重复出现;
  • 如果输入流之一终止,我们将停止处理。

剩下的唯一事情是将我们的输入与handleWye 连接起来,并将处理任务的副作用包含在流中,我们在processRequests 中这样做。

class Loader[Key, Value](dataSource: DataSource[Key, Value], refreshEvery: Duration) {

  type CacheState = Map[Key, Value]
  type Request = CacheState => Task[Unit]
  type ReaderEnv = Env[CacheState, Request]

  implicit val scheduler: ScheduledExecutorService = DefaultScheduler

  private val timer: Process[Task, Unit] =
    Process.emit(()) ++ awakeEvery(refreshEvery).map(_ => ())

  private val cacheStates: Process[Task, CacheState] =
    timer.evalMap(_ => Task(dataSource.loadMap))

  private val handle: Wye[CacheState, Request, Task[Unit]] = {
    def handleImpl(current: CacheState): Wye[CacheState, Request, Task[Unit]] = {
      import wye._
      import Process._
      receiveBoth {
        case ReceiveL(i) => handleImpl(i)
        case ReceiveR(i) => emit(i(current)) ++ handleImpl(current)
        case HaltOne(rsn) => Halt(rsn)
      }
    }
    wye.receiveL[CacheState, Request, Task[Unit]](handleImpl)
  }

  def processRequests(requests: Process[Task, Request]): Process[Task, Unit] =
    cacheStates.wye(requests)(handle).eval
}

让我们通过向它发出 100 个最大 id 的数据请求(每个 100 毫秒)来测试我们的数据加载器,同时每秒执行一次刷新:

object TestStreamBatching {

  private val loader = new Loader(DataSourceStub, 1.second)

  private def request(cache: loader.CacheState): Task[Unit] = Task {
    Thread.sleep(100)
    val key = cache.keys.max
    val value = cache(key)
    println(value)
  }

  private val requests: Process[Task, loader.Request] =
    Process.unfold(100)(s => if(s > 0) Some((request, s - 1)) else None)

  def main(args: Array[String]): Unit = {
    loader.processRequests(requests).run.unsafePerformSync
  }
}

通过运行它,您可以看到一个由'a' 字母组成的阶梯,每秒钟增加它的竖线大小,最终在输出 100 次后终止。

【讨论】:

  • 顺便说一句,在这种情况下我们也可以使用 akka-streams :)
  • 但是请注意,akka-streams 虽然是背压流的一个很好的实现,但它是一种命令式设计,不需要维护引用透明度,甚至没有这样的概念。在使用它实现函数式编程结构时,您需要仔细注意它不会在看似无效的调用中在后台执行任何操作(例如 Scala 的 Future 构造函数,它会立即启动提供的函数)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-02-03
  • 2013-10-10
  • 2020-04-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多