【发布时间】:2018-05-07 11:13:25
【问题描述】:
假设我想将一些旧的异步 API 转换为 FS2 Streams。 API 提供了一个带有 3 个回调的接口:下一个元素、成功、错误。 我希望 Stream 发出所有元素,然后在收到成功或错误回调时完成。
FS2 指南 (https://functional-streams-for-scala.github.io/fs2/guide.html) 建议在这种情况下使用 fs2.Queue,
它非常适合入队,但是到目前为止我看到的所有示例都期望queue.dequeue 返回的流永远不会完成-
在我的情况下,没有明显的方法来处理成功/错误回调。
我尝试使用queue.dequeue.interruptWhen(...here goes the signal...),但如果成功/错误回调在客户端从流中读取数据之前到达,
流过早终止 - 仍有未读元素。我希望消费者在完成流之前完成阅读它们。
FS2 可以做到这一点吗?使用 Akka Streams 很简单 - SourceQueueWithComplete 有 complete 和 fail 方法。
更新: 通过在 Option 中包装元素并将 None 视为停止读取流的信号,以及使用 Promise 传播错误,我能够获得足够好的结果:
queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)
但是,我是否忽略了更自然的处理方式?
【问题讨论】: