【问题标题】:Streaming a ByteArrayOutputStream to an akka http response将 ByteArrayOutputStream 流式传输到 akka http 响应
【发布时间】:2022-01-06 06:47:39
【问题描述】:

我正在使用 ZIO 流创建一个 ByteArrayOutputStream,即:

lazy val byteArrayOutputStream = new ByteArrayOutputStream()
val sink = ZSink.fromOutputStream(byteArrayOutputStream).contramapChunks[String](_.flatMap(_.getBytes)
val data = ZStream.unwrap(callToFunction).run(sink)

这很好用 - 现在我需要使用 akka http 将这些数据流式传输回客户端。 我可以这样做:

val arr = byteArrayOutputStream.toByteArray
complete(HttpEntity(ContentTypes.`application/octet-stream`, arr)

这可行,但当然 toByteArray 将输出流带入内存,即我不流式传输数据。我遗漏了一些明显的东西 - 有没有简单的方法可以做到这一点?

【问题讨论】:

    标签: scala akka-http reactive-streams bytearrayoutputstream zio-streams


    【解决方案1】:

    您可以将输出流转换为 Akka Stream Source:

    val byteArrayOutputStream = new ByteArrayOutputStream()
    val source = StreamConverters.asOutputStream().mapMaterializedValue(_ => byteArrayOutputStream)
    

    然后简单地创建一个分块的 HTTP 实体:

    HttpResponse(entity = HttpEntity.Chunked.fromData(ContentTypes.`application/octet-stream`, source))
    

    更多关于分块传输:https://datatracker.ietf.org/doc/html/rfc7230#section-4.1

    对于 ZIO,您可能会使用如下内容:

    val zSource = ZStream.fromOutputStreamWriter(os => byteArrayOutputStream.writeTo(os))
    

    但是,您需要找到一种将 ZStream 转换为 Akka Stream Source 的方法。

    【讨论】:

    • github.com/zio/interop-reactive-streams 可能有助于在 ZStream 和 Akka Stream 之间进行转换
    • 感谢@Branislav 的回答。但是,当我实现并运行它时,请求永远不会返回。我从 API 中注意到它本质上是阻塞的,所以我创建了一个自定义调度程序 - 不幸的是再次没有乐趣。有什么我想念的吗?
    • @PJ Fanning - 是的,我看过了,但是例子不是很好,它使用了来自 reactive.org 的代码,该代码多年来没有更新 该网站不再可用.在公司环境中使用这种东西我很难证明其合理性
    猜你喜欢
    • 1970-01-01
    • 2018-02-19
    • 2023-04-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-08
    • 2011-01-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多