【问题标题】:Stream csv file to browser using akka stream and spray使用 akka 流和喷雾将 csv 文件流式传输到浏览器
【发布时间】:2015-11-05 07:16:56
【问题描述】:

如何将Source[String, Unit] 连接到流媒体演员?

我认为https://gist.github.com/whysoserious/96050c6b4bd5fedb6e33 的修改版StreamingActor 会很好用,但我很难将各个部分联系起来。

鉴于source: Source[String, Unit]ctx: RequestContext,我认为修改后的StreamingActor 应该与actorRefFactory.actorOf(fromSource(source, ctx)) 连接。

以上要点供参考:

import akka.actor._
import akka.util.ByteString
import spray.http.HttpEntity.Empty
import spray.http.MediaTypes._
import spray.http._
import spray.routing.{HttpService, RequestContext, SimpleRoutingApp}

object StreamingActor {

  // helper methods

  def fromString(iterable: Iterable[String], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromStringAndCharset(iterable: Iterable[String], ctx: RequestContext, charset: HttpCharset): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromByteArray(iterable: Iterable[Array[Byte]], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromByteString(iterable: Iterable[ByteString], ctx: RequestContext): Props = {
    fromHttpData(iterable.map(HttpData.apply), ctx)
  }
  def fromHttpData(iterable: Iterable[HttpData], ctx: RequestContext): Props = {
    Props(new StreamingActor(iterable, ctx))
  }

  // initial message sent by StreamingActor to itself
  private case object FirstChunk

  // confirmation that given chunk was sent to client
  private case object ChunkAck

}

class StreamingActor(chunks: Iterable[HttpData], ctx: RequestContext) extends Actor with HttpService with ActorLogging {

  import StreamingActor._

  def actorRefFactory = context

  val chunkIterator: Iterator[HttpData] = chunks.iterator

  self ! FirstChunk

  def receive = {

    // send first chunk to client
    case FirstChunk if chunkIterator.hasNext =>
      val responseStart = HttpResponse(entity = HttpEntity(`text/html`, chunkIterator.next()))
      ctx.responder ! ChunkedResponseStart(responseStart).withAck(ChunkAck)

    // data stream is empty. Respond with Content-Length: 0 and stop
    case FirstChunk =>
      ctx.responder ! HttpResponse(entity = Empty)
      context.stop(self)

    // send next chunk to client  
    case ChunkAck if chunkIterator.hasNext =>
      val nextChunk = MessageChunk(chunkIterator.next())
      ctx.responder ! nextChunk.withAck(ChunkAck)

    // all chunks were sent. stop.  
    case ChunkAck =>
      ctx.responder ! ChunkedMessageEnd
      context.stop(self)

    //   
    case x => unhandled(x)
  }

}

【问题讨论】:

标签: scala akka spray akka-stream


【解决方案1】:

我认为您使用 StreamingActor 会使您试图解决的潜在问题过于复杂。此外,问题中的 StreamingActor 将生成多个 HttpResponse 值,每个块 1 个,单个 HttpRequest。这是低效的,因为您可以简单地返回 1 个带有 HttpEntity.Chunked 的 HttpReponse 作为数据流源的实体。

通用并发设计

Actor 用于状态,例如维护连接之间的运行计数器。即便如此,Agent 也涵盖了很多领域,并具有类型检查的额外好处(与 Actor.receive 不同,它在运行时将死信邮箱变成您唯一的类型检查器)。

并发计算,而不是状态,应该(按顺序)处理:

  1. Future 作为首要考虑因素:可组合、编译时类型安全检查以及大多数情况下的最佳选择。

  2. akka Streams:可组合,编译时类型安全检查,非常有用,但由于方便的背压功能,有很多overhead。 Steam 也是 HttpResponse 实体的形成方式,如下所示。

流式传输 CSV 文件

您的基本问题是如何使用 Streams 将 csv 文件流式传输到 http 客户端。您可以从创建数据源并将其嵌入到 HttpResponse 开始:

def lines() = scala.io.Source.fromFile("DataFile.csv").getLines()

import akka.util.ByteString
import akka.http.model.HttpEntity

def chunkSource : Source[HttpEntity.ChunkStreamPart, Unit] = 
  akka.stream.scaladsl.Source(lines)
                      .map(ByteString.apply)
                      .map(HttpEntity.ChunkStreamPart.apply)

def httpFileResponse = 
  HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, chunkSource))

然后您可以为任何请求提供此响应:

val fileRequestHandler = {
  case HttpRequest(GET, Uri.Path("/csvFile"), _, _, _) => httpFileResponse
}   

然后将 fileRequestHandler 嵌入到您的服务器路由逻辑中。

【讨论】:

    猜你喜欢
    • 2016-08-02
    • 2010-10-11
    • 2013-04-18
    • 1970-01-01
    • 2010-09-14
    • 2012-07-02
    • 2015-05-22
    • 2012-02-17
    • 1970-01-01
    相关资源
    最近更新 更多