【问题标题】:How do I "split" a stream in fs2?如何在 fs2 中“拆分”流?
【发布时间】:2019-12-10 19:31:02
【问题描述】:

我想做这样的事情:

def splitStream[F, A](stream: fs2.Stream[F, A], split: A => B): (Stream[F, A], Stream[F, B)) = 
  (stream, stream.map(split)

但这不起作用,因为它从源头“拉”两次 - 当我耗尽 streamstream.map(split) 时,每次一次。我该如何防止这种情况?通过维护我自己的内部缓冲区以某种方式切换到基于“推”的模型,这样我就不会拉两次?

【问题讨论】:

    标签: scala fs2


    【解决方案1】:

    通过维护我自己的内部缓冲区以某种方式切换到基于“推”的模型,这样我就不会拉两次?

    是的。例如,您可以使用 fs2 中的队列:

    def splitStream[F[_], A](stream: Stream[F, A], split: A => B): F[(Stream[F, A], Stream[F, B])] = 
      for {
        q <- Queue.noneTerminated[F, A]
      } yield (stream.evalTap(a => q.enqueue1(Some(a)).onFinalize(q.enqueue1(None)), q.dequeue.map(split))
    

    当然,这里的问题是,如果调用者忽略任何一个流,另一个流将死锁并且永远不会发出任何东西。这通常是您在尝试将一个流分成多个流时遇到的问题,并且无论何时订阅,都保证在每个子流中出现一个值。

    我通常采用的解决方案是组合更大的操作并使用broadcastparJoin 等运算符:

    def splitAndRun[F[_]: Concurrent, A](
      base: Stream[F, A],
      runSeveralThings: List[Stream[F, A] => Stream[F, Unit]]
    ): F[Unit] =
      base.broadcastTo(run: _*).compile.drain
    

    在这里,您知道您将拥有多少消费者,因此首先不会有忽略的流。

    【讨论】:

    • 我们能否以某种方式“复制”每个 Astream 并将一半发送到 stream,另一半发送到 steam.map(split)?这将解决不需要中间缓冲区的问题?
    • @pathikrit 的心智模型是不同的——你确实可以“复制”东西(stream.map(a =&gt; (a, a)) 是最愚蠢的例子,但也广播“复制”),但你不能发送 i> 东西变成一个流。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-06-14
    • 1970-01-01
    • 1970-01-01
    • 2021-09-01
    • 1970-01-01
    • 2010-11-06
    • 1970-01-01
    相关资源
    最近更新 更多