【问题标题】:Specify the processing time of tasks fs2 streams Scala指定任务 fs2 流 Scala 的处理时间
【发布时间】:2020-06-15 17:19:29
【问题描述】:

我正在尝试处理一些任务。 任务所花费的时间可能因元素而异。例如,从队列中存储和获取元素 1 可能需要 11 秒,而 2 可能需要 30 秒...

我已经尝试为此使用计时器,但我仍然得到 entryTime = ExitTime 我想知道我错过了什么。

这是我尝试过的:

import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random

class Tst(q1: Queue[IO, (Double, String, String)])(implicit timer: Timer[IO]) {

  import core.Processing._

  def storeInQueue: Stream[IO, Unit] = {

    Stream(1, 2, 3)
      .covary[IO]
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
      .map { n =>
        val entryTime = currentTimeNow
        (n.toDouble, "Service", entryTime)
      //  timer.sleep(Random.between(10, 30).seconds) I have tried adding it here but the same result
       }
      .through(q1.enqueue)  
  }

      def getFromQueue: Stream[IO, Unit] = {
        timer.sleep(Random.between(10, 30).seconds)
        q1.dequeue
          .map { n =>
            val exitTime = currentTimeNow
            (n._1, "Service", n._3, exitTime)
          }
          .evalMap(n => IO.delay(println(s"Pulling from queue $n")))
      }
    }

    object Five2 extends IOApp {

      override def run(args: List[String]): IO[ExitCode] = {
        val program = for {
          q <- Queue.bounded[IO, (Double, String, String)](10)
          b = new Tst(q)
          _ <- b.storeInQueue.compile.drain.start

          _ <- b.getFromQueue.compile.drain

        } yield ()
        program.as(ExitCode.Success)
      }
    }

currentTimeNow 方法给出:

def currentTimeNow: String = {
    val format = new SimpleDateFormat("dd-MM-yy hh:mm:ss")
    format.format(Calendar.getInstance().getTime())
  }

【问题讨论】:

  • 不,不是,我已经编辑了问题,并指定了 currentTimeNow 方法的实现。

标签: scala timer queue scheduling fs2


【解决方案1】:

得到 gitter 的帮助:这是答案: 你在那里做事的方式有几个问题。首先,您的 timer.sleep call 被忽略。 这是因为它返回一个 IO,IO 除非被评估,否则不会做任何事情。所以你想让它在你的流的管道中执行,你可以用类似的东西来做

q1.dequeue.evalTap(_ => timer.sleep(Random.between(10, 30).seconds))).map { n => … other things … }

第二件事是我不知道你的 currentTimeNow 函数是如何工作的,但通常在函数堆栈中获取当前时间是一个有效的操作,所以使用计时器你将获得当前时间更像这样:

Stream(1, 2, 3)
      .covary[IO]
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
      .evalMap { n =>
        timer.clock.realTime(java.util.concurrent.TimeUnit.MILLISECONDS)
        .map(entryTime => (n.toDouble, “Service”, entryTime))
      }
      .through(q1.enqueue)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-06-14
    • 1970-01-01
    • 1970-01-01
    • 2021-10-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-21
    相关资源
    最近更新 更多