【问题标题】:FS2: is it possible to complete Queue gracefully?FS2:是否可以优雅地完成队列?
【发布时间】:2018-05-07 11:13:25
【问题描述】:

假设我想将一些旧的异步 API 转换为 FS2 Streams。 API 提供了一个带有 3 个回调的接口:下一个元素、成功、错误。 我希望 Stream 发出所有元素,然后在收到成功或错误回调时完成。

FS2 指南 (https://functional-streams-for-scala.github.io/fs2/guide.html) 建议在这种情况下使用 fs2.Queue, 它非常适合入队,但是到目前为止我看到的所有示例都期望queue.dequeue 返回的流永远不会完成- 在我的情况下,没有明显的方法来处理成功/错误回调。 我尝试使用queue.dequeue.interruptWhen(...here goes the signal...),但如果成功/错误回调在客户端从流中读取数据之前到达, 流过早终止 - 仍有未读元素。我希望消费者在完成流之前完成阅读它们。

FS2 可以做到这一点吗?使用 Akka Streams 很简单 - SourceQueueWithCompletecompletefail 方法。

更新: 通过在 Option 中包装元素并将 None 视为停止读取流的信号,以及使用 Promise 传播错误,我能够获得足够好的结果:

queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)

但是,我是否忽略了更自然的处理方式?

【问题讨论】:

    标签: scala fs2


    【解决方案1】:

    一种惯用的方法是创建一个Queue[Option[A]] 而不是Queue[A]。入队时,包裹Some,您可以显式地将None 入队以表示完成。在出队方面,执行q.dequeue.unNoneTerminate,它会为您提供一个Stream[F, A],一旦队列发出None,它就会终止

    【讨论】:

      【解决方案2】:

      对您的更新的回答:将unNoneTerminaterethrow 结合起来,这需要一个Stream[F, Either[Throwable, A]] 并返回一个Stream[F, A],当它遇到一个throwable 时会出现Stream.raiseError 错误。

      然后,您的完整堆栈将是 Stream[F, Either[Throwable, Option[A]]],然后您通过调用 .rethrow.unNoneTerminate 解包到 Stream[F,A]

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2010-12-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-03-30
        • 2012-10-15
        • 2012-01-15
        相关资源
        最近更新 更多