【问题标题】:How to stop Stream to evaluate next element and get accumulated result in a functional way如何停止 Stream 以评估下一个元素并以功能方式获得累积结果
【发布时间】:2014-11-13 14:00:07
【问题描述】:

我有这段代码,我想让 Stream 停止迭代并获得累积的结果。基本上,迭代是基于errorLimit数

sealed trait Ele

case class FailureEle() extends Ele
case class SuccessEle() extends Ele

type EitherResult = Either[IndexedSeq[Ele], Seq[FailureEle]]

 def parse(process: Process[Task, Ele], errorLimit: Int): EitherResult = {

  val errorAccumulator = new ListBuffer[FailureEle]
  val taskProcess = process.map(t => {
    t match {
      case x: FailureEle => errorAccumulator += x
      case _ => 
    }
    t
  }).takeWhile(_ => !(errorAccumulator.size == errorLimit))

    val voSeq = taskProcess.runLog.run

    if (errorAccumulator.isEmpty) {
      Left(voSeq)
    } else {
      Right(errorAccumulator)
    }

}
val result = Seq(FailureEle(), SuccessEle(), FailureEle(), SuccessEle(), SuccessEle(), FailureEle(), SuccessEle())

val adaptor = new SeqAdaptor[Ele](result)

val process: Process[Task, Ele] = Process
  .repeatEval(Task {adaptor.next()}).takeWhile(t => !t.shouldStop).map(_.get)

parse(process, 1).isRight //no SuccessEle will be iterated
parse(process, 2).isRight //only one SuccessEle will be iterated
parse(process, 3).isRight //the last one SuccessEle will not be iterated

它正在工作,但有几个问题我想重构 parse 方法以使其更实用:

  • ListBuffer 是一种强制方式

  • takeWhile 条件没有检查当前元素的逻辑,它仍在使用 ListBuffer 结果

所以我想知道是否有一种尾递归方式可以通过使用 ListBuffer 来替换命令式方式。

【问题讨论】:

    标签: scala functional-programming scalaz


    【解决方案1】:

    scan 可能还不够好,但有效

         sealed trait Ele
          case class FailureEle(e: Throwable) extends Ele
          case class SuccessEle(r: String) extends Ele
    
          def parse(p: Process[Task, Ele], error: Int): Process[Task, (Seq[SuccessEle],   Seq[FailureEle])] = {
                p.scan(Seq[SuccessEle]() -> Seq[FailureEle]()) { (r, e) =>
                  val (s, f) = r
                  e match {
                    case fail: FailureEle =>
                      s  -> (f :+ fail)
                    case succ: SuccessEle =>
                      (s :+ succ) -> f
                  }
                }.dropWhile { case (succ, fail) => fail.size < error }.take(1)
              }
    
            def test() {
                def randomFail = {
                  val nInt =  scala.util.Random.nextInt()
                  println("getting" +  nInt)
                  if(nInt % 5 == 0 )
                    FailureEle(new Exception("fooo"))
                  else
                    SuccessEle(nInt.toString)
                }
                val infinite = Process.repeatEval(Task.delay(randomFail))
                val r = parse(infinite, 3).runLast.run
                println(r)
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-01-12
      • 1970-01-01
      • 2017-06-29
      • 1970-01-01
      • 2016-11-22
      • 2015-08-20
      • 2016-03-18
      • 1970-01-01
      相关资源
      最近更新 更多