【问题标题】:why is my code not returning anything ? Scala fs2为什么我的代码没有返回任何东西?斯卡拉 fs2
【发布时间】:2020-07-07 05:11:59
【问题描述】:

该程序允许将 Mapping Ints 推送到 Double 并识别队列的退出时间。 该程序没有显示任何错误,但它没有打印任何内容。 我错过了什么?

import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue

import scala.concurrent.duration._
import scala.util.Random
class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {

  val streamData = Stream.emit(1)
  val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData

  def storeInQueue: Stream[IO, Unit] = {
    scheduledStream
      .map { n =>
        val entryTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n.toDouble, entryTime)
      }
      .through(q1.enqueue)
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))

    q1.dequeue
      .evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
      .map { n =>
        val exitTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n._1, exitTime)
      }
      .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
  }
}

object Five2 extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, (Double, IO[Long])](1)
      b = new Tst(q)
      _ <- b.storeInQueue.compile.drain

    } yield ()
    program.as(ExitCode.Success)
  }
}

【问题讨论】:

标签: scala queue fs2


【解决方案1】:

IO 是惰性求值的——要执行某些东西,它必须是创建最终 IO 值的表达式的一部分。

这里:

  def storeInQueue: Stream[IO, Unit] = {
    scheduledStream ... // no side effects are run when we create this!

    q1.dequeue ... // not using scheduledStream
  }

value scheduledStream 根本没有被使用,因此它不是从 storeInQueue 返回的值的“一部分”,所以当 IOApp 将 IO 值转换为计算时,您的程序的配方不包含消息推送到队列的部分,因此队列始终为空。

订阅队列的部分可以工作,但由于没有任何东西进入队列,它一直在等待永远不会到达的项目。

您必须通过“使它们成为一个 IO 值的一部分”来启动两个流,例如像这样:

class Tst(q1: Queue[IO, (Double, IO[Long])])(implicit timer: Timer[IO]) {

  val streamData = Stream.emit(1)
  val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData

  def storeInQueue =
    scheduledStream
      .map { n =>
        val entryTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n.toDouble, entryTime)
      }
      .through(q1.enqueue)
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))

  def takeFromQueue =
    q1.dequeue
      .evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
      .map { n =>
        val exitTime =
          timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
        (n._1, exitTime)
      }
      .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
  }
}

object Five2 extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, (Double, IO[Long])](1)
      b = new Tst(q)
      pushFiber <- b.storeInQueue.compile.drain.start // run as fiber
      pullFiber <- b.takeFromQueue.compile.drain.start // run as fiber
    } yield ()
    program.as(ExitCode.Success)
  }
}

【讨论】:

  • 但是如果流在离开刚刚创建的队列后应该被另一个队列再次使用怎么办?
  • 您可以重用队列,因为它们是封装在纯操作中的可变共享资源,但是如果您重用队列中的流,则意味着您正在对创建它们两次的队列运行相同的计算。
  • 如果我明白了,我应该从计算 1 返回一个 Queue[IO, Int] 而不是 Stream[IO, Int] 以便在下一次计算中使用它
  • 如果要复用队列,就复用队列。
  • 我要开一个新问题,因为我尝试重复使用它并失败了
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-04-10
  • 2023-02-07
相关资源
最近更新 更多