【问题标题】:What is the scalaz-stream equivalent to Play Framework's Enumerator.fromCallback什么是等效于 Play Framework 的 Enumerator.fromCallback 的 scalaz-stream
【发布时间】:2014-04-12 17:51:10
【问题描述】:

Play Framework 的 iteratee 库定义了一个方法 Enumerator.fromCallback,它允许根据 Future 的结果生成元素:

http://www.playframework.com/documentation/2.2.x/Enumerators

def fromCallback[E](
  retriever: () => Future[Option[E]],
  onComplete: () => Unit = () => (),
  onError: (String, Input[E]) => Unit = (_: String, _: Input[E]) => ()
): Enumerator[E]

你可以在这里看到一个很好的例子,它被用来从 Web 服务传递分页结果:

http://engineering.klout.com/2013/01/iteratees-in-big-data-at-klout/

def pagingEnumerator(url:String):Enumerator[JsValue]={
  var maybeNextUrl = Some(url) //Next url to fetch
  Enumerator.fromCallback[JsValue] ( retriever = {
    val maybeResponsePromise =
      maybeNextUrl map { nextUrl=>  
        WS.url(nextUrl).get.map { reponse =>
          val json = response.json
          maybeNextUrl = (json \ "next_url").asOpt[String]
          val code = response.status //Potential error handling here
          json
        }   
      }

    /* maybeResponsePromise will be an Option[Promise[JsValue]].
     * Need to 'flip' it, to make it a Promise[Option[JsValue]] to
     * conform to the fromCallback constraints */
    maybeResponsePromise match {
      case Some(responsePromise) => responsePromise map Some.apply
      case None => PlayPromise pure None
    }
  })
}

执行相同操作的等效 scalaz-stream 代码是什么?我很确定它可以使用Process.emitProcess.awaitProcess.eval 来完成,但我很想看到一个可行的例子。这可能还需要将 scala Future 提升为 scalaz Task,这里有一个答案:

Convert scala 2.10 future to scalaz.concurrent.Future // Task

如果它使事情变得更简单,我们可以忽略 scala Future vs scalaz Task 位并假设我们有一个任务。

【问题讨论】:

标签: scala scalaz scalaz-stream


【解决方案1】:

要从 scala.concurrent.Future 获取 scalaz.concurrent.Task,你可以使用 Task.async,当你手头有任务时,你可以这样做:

  import java.util.concurrent.atomic.AtomicInteger
  import scalaz.concurrent.Task
  import scalaz.stream.Process.End
  import scalaz.stream._

  val cnt = new AtomicInteger(0)

  val task: Task[String] = Task {
    if (cnt.incrementAndGet() <= 10) s"Task ${cnt.get}" else throw End
  }

  Process.repeatEval(task).runLog.run.foreach(println)

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-09-21
  • 1970-01-01
  • 2012-09-30
  • 1970-01-01
  • 2019-06-07
  • 1970-01-01
相关资源
最近更新 更多