【问题标题】:How can I combine two scalaz streams with a predicate selector?如何将两个 scalaz 流与谓词选择器结合起来?
【发布时间】:2016-07-13 13:32:12
【问题描述】:

我想将两个 scalaz 流与一个谓词结合起来,该谓词从任一流中选择下一个元素。例如,我希望这个测试通过:

val a = Process(1, 2, 5, 8)
val b = Process(3, 4, 5, 7)

choose(a, b)(_ < _).toList shouldEqual List(1, 2, 3, 4, 5, 5, 7, 8)

如您所见,我们不能像zip 这样聪明地对这两个元素进行排序,因为有时可能会连续选择其中一个进程。

我尝试了一个我认为可行的解决方案。它编译!但该死的,如果它什么都不做。 JVM 只是挂起 :(

import scalaz.stream.Process._
import scalaz.stream._

object StreamStuff {
  def choose[F[_], I](a:Process[F, I], b:Process[F, I])(p: (I, I) => Boolean): Process[F, I] =
    (a.awaitOption zip b.awaitOption).flatMap {
      case (Some(ai), Some(bi)) =>
        if(p(ai, bi)) emit(ai) ++ choose(a, emit(bi) ++ b)(p)
        else emit(bi) ++ choose(emit(ai) ++ a, b)(p)
      case (None, Some(bi)) => emit(bi) ++ b
      case (Some(ai), None) => emit(ai) ++ a
      case _ => halt
    }
}

请注意,以上是我的第二次尝试。在我的第一次尝试中,我尝试创建一个Tee,但我不知道如何取消使用失败者元素。我觉得我需要像这里一样的递归。

我正在使用流版本0.7.3a

非常感谢任何提示(包括增量提示,因为我想简单地学习如何自己解决这些问题)!

【问题讨论】:

    标签: scala scalaz scalaz-stream


    【解决方案1】:

    我将在下面给出一些提示和实现,因此如果您想自己制定解决方案,可能需要覆盖屏幕。

    免责声明:这只是我想到的第一种方法,而且我对 scalaz-stream API 的熟悉程度有点生疏,因此可能有更好的方法来实现此操作,这在某些可怕的情况下可能完全错误方式等。

    提示 1

    您可以在下一次递归调用中传递它们,而不是尝试“取消消耗”丢失的元素。

    提示 2

    您可以通过指明最后输掉的一方来避免累积多个失败元素。

    提示 3

    当我使用 Scalaz 流时,我经常发现首先使用普通集合来勾勒出一个实现更容易。这是列表所需的辅助方法:

    /**
     * @param p if true, the first of the pair wins
     */
    def mergeListsWithHeld[A](p: (A, A) => Boolean)(held: Either[A, A])(
      ls: List[A],
      rs: List[A]
    ): List[A] = held match {
      // Right is the current winner.
      case Left(l) => rs match {
        // ...but it's empty.
        case Nil => l :: ls
        // ...and it's still winning.
        case r :: rt if p(r, l) => r :: mergeListsWithHeld(p)(held)(ls, rt)
        // ...upset!
        case r :: rt => l :: mergeListsWithHeld(p)(Right(r))(ls, rt)
      }
      // Left is the current winner.
      case Right(r) => ls match {
        case Nil => r :: rs
        case l :: lt if p(l, r) => l :: mergeListsWithHeld(p)(held)(lt, rs)
        case l :: lt => r :: mergeListsWithHeld(p)(Left(l))(lt, rs)
      }
    }
    

    假设我们手头已经有一个失败的元素,但现在我们可以编写我们真正想要使用的方法:

    def mergeListsWith[A](p: (A, A) => Boolean)(ls: List[A], rs: List[A]): List[A] =
      ls match {
        case Nil => rs
        case l :: lt => rs match {
          case Nil => ls
          case r :: rt if p(l, r) => l :: mergeListsWithHeld(p)(Right(r))(lt, rt)
          case r :: rt            => r :: mergeListsWithHeld(p)(Left(l))(lt, rt)
        }
      }
    

    然后:

    scala> org.scalacheck.Prop.forAll { (ls: List[Int], rs: List[Int]) =>
         |   mergeListsWith[Int](_ < _)(ls.sorted, rs.sorted) == (ls ++ rs).sorted
         | }.check
    + OK, passed 100 tests.
    

    好的,看起来不错。我们可以为列表编写更好的方法,但这个实现与我们需要为 Process 做的形式相匹配。

    实施

    这里或多或少等同于 scalaz-stream:

    import scalaz.{ -\/, \/, \/- }
    import scalaz.stream.Process.{ awaitL, awaitR, emit }
    import scalaz.stream.{ Process, Tee, tee }
    
    def mergeWithHeld[A](p: (A, A) => Boolean)(held: A \/ A): Tee[A, A, A] =
      held.fold(_ => awaitR[A], _ => awaitL[A]).awaitOption.flatMap {
        case None =>
          emit(held.merge) ++ held.fold(_ => tee.passL, _ => tee.passR)
        case Some(next) if p(next, held.merge) =>
          emit(next) ++ mergeWithHeld(p)(held)
        case Some(next) =>
          emit(held.merge) ++ mergeWithHeld(p)(
            held.fold(_ => \/-(next), _ => -\/(next))
          )
      }
    
    def mergeWith[A](p: (A, A) => Boolean): Tee[A, A, A] =
      awaitL[A].awaitOption.flatMap {
        case None => tee.passR
        case Some(l) => awaitR[A].awaitOption.flatMap {
          case None =>               emit(l) ++ tee.passL
          case Some(r) if p(l, r) => emit(l) ++ mergeWithHeld(p)(\/-(r))
          case Some(r)            => emit(r) ++ mergeWithHeld(p)(-\/(l))
        }
      }
    

    让我们再检查一遍:

    scala> org.scalacheck.Prop.forAll { (ls: List[Int], rs: List[Int]) =>
         |   Process.emitAll(ls.sorted).tee(Process.emitAll(rs.sorted))(
         |     mergeWith(_ < _)
         |   ).toList == (ls ++ rs).sorted
         | }.check
    + OK, passed 100 tests.
    

    如果不进行更多测试,我不会将其投入生产,但它看起来很有效。

    【讨论】:

      【解决方案2】:

      您必须按照 Travis Brown 的建议实现自定义 T 恤。这是我的发球台implementation

      /*
        A tee which sequentially compares elements from left and right
        and passes an element from left if predicate returns true, otherwise
        passes an element from right.
       */
      def predicateTee[A](predicate: (A, A) => Boolean): Tee[A, A, A] = {
      
        def go(stack: Option[A \/ A]): Tee[A, A, A] = {
          def stackEither(l: A, r: A) =
            if (predicate(l, r)) emit(l) ++ go(\/-(r).some) else emit(r) ++ go(-\/(l).some)
      
          stack match {
            case None =>
              awaitL[A].awaitOption.flatMap { lo =>
                awaitR[A].awaitOption.flatMap { ro =>
                  (lo, ro) match {
                    case (Some(l), Some(r)) => stackEither(l, r)
                    case (Some(l), None) => emit(l) ++ passL
                    case (None, Some(r)) => emit(r) ++ passR
                    case _ => halt
                  }
                }
              }
            case Some(-\/(l)) => awaitR[A].awaitOption.flatMap {
              case Some(r) => stackEither(l, r)
              case None => emit(l) ++ passL
            }
            case Some(\/-(r)) => awaitL[A].awaitOption.flatMap {
              case Some(l) => stackEither(l, r)
              case None => emit(r) ++ passR
            }
          }
        }
      
        go(None)
      }
      
      val p1: Process[Task, Int] = Process(1, 2, 4, 5, 9, 10, 11)
      val p2: Process[Task, Int] = Process(0, 3, 7, 8, 6)
      
      p1.tee(p2)(predicateTee(_ < _)).runLog.run
      //res0: IndexedSeq[Int] = Vector(0, 1, 2, 3, 4, 5, 7, 8, 6, 9, 10, 11)
      

      【讨论】:

        猜你喜欢
        • 2012-05-17
        • 1970-01-01
        • 1970-01-01
        • 2019-12-27
        • 2021-09-28
        • 2021-09-05
        • 1970-01-01
        • 2015-10-11
        相关资源
        最近更新 更多