【发布时间】:2015-06-10 01:38:01
【问题描述】:
我正在使用 Spray 查询 REST 端点,该端点将返回大量数据,其中包含应处理的多个项目。数据是一系列 json 对象。有没有办法将响应转换为这些对象的流,而不需要我将整个响应读入内存?
阅读文档时提到了"chunked responses",这似乎与我想要的一致。如何在喷雾客户端管道中使用它?
【问题讨论】:
标签: spray chunked spray-client
我正在使用 Spray 查询 REST 端点,该端点将返回大量数据,其中包含应处理的多个项目。数据是一系列 json 对象。有没有办法将响应转换为这些对象的流,而不需要我将整个响应读入内存?
阅读文档时提到了"chunked responses",这似乎与我想要的一致。如何在喷雾客户端管道中使用它?
【问题讨论】:
标签: spray chunked spray-client
感谢http://boldradius.com/blog-post/VGy_4CcAACcAxg-S/streaming-play-enumerators-through-spray-using-chunked-responses 上的精彩文章,我今天刚刚实现了类似的功能。
本质上,您想要做的是在您的一个 Route 定义中获取 RequestContext,并获取对其“响应者”Actor 的引用。这是 Spray 将响应发送回发送原始请求的客户端的 Actor。
要发回分块响应,您必须发出响应开始的信号,然后一个接一个地发送块,最后发出响应已经完成的信号。您可以通过 spray.http 包中的 ChunkedResponseStart、MessageChunk 和 ChunkedMessageEnd 类执行此操作。
基本上我最终做的是将响应作为一系列这样的类发送:
0) 将一堆导入放入包含您的 Routes 的类中,以及一个案例对象:
import akka.actor.{Actor, ActorRef}
import spray.http._
import akka.actor.ActorRef
import akka.util.Timeout
import akka.pattern.ask
import spray.http.HttpData
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import akka.actor.{ActorContext, ActorRefFactory, Props}
import spray.http.{HttpData, ContentType}
import spray.routing.RequestContext
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import spray.json.RootJsonFormat
import spray.http.MediaTypes._
object Messages {
case object Ack
}
1) 从您的 Route 中获取 requestContext:
path ("asdf") {
get { requestContext => {
... further code here for sending chunked response ...
}
}
2) 开始响应(作为 JSON 信封,在本例中将响应数据保存在名为“myJsonData”的 JSON 数组中):
responder.forward(ChunkedResponseStart(HttpResponse(entity = HttpEntity(`application/json`, """{"myJsonData": ["""))).withAck(Ack))
3) 遍历您的结果数组,将它们的 JSON 化版本作为 JSON 数组中的元素发送到响应,逗号分隔直到发送最终元素 - 然后不需要尾随逗号:
requestContext.responder.forward(MessageChunk(HttpData(myArray.toJson).withAck(Ack))
if (!lastElement) { // however you work this out in your code!
requestContext.responder.forward(MessageChunk(HttpData(",").withAck(Ack))
}
4) 当没有东西要发送时,关闭 JSON 信封:
responder.forward(MessageChunk("]}").withAck(Ack))
并发出响应结束的信号:
responder.forward(ChunkedMessageEnd().withAck(Ack))
在我的解决方案中,我一直在使用 Play Iteratees 和 Enumerators,因此我没有在此处包含大量代码,因为它们与这些可能不适合您的需求的机制密切相关。 “withAck”调用的重点是,当网络发出可以接受更多块的信号时,这将导致响应者请求确认消息。理想情况下,您应该编写代码以等待将来返回 Ack 消息,然后再发送更多块。
我希望以上内容至少可以为您提供十个初学者,正如我所说,这些概念在我链接到的文章中得到了很好的解释!
谢谢, 邓肯
【讨论】: