【问题标题】:AKKA HTTP Source Streaming with FuturesAKKA HTTP 源流与期货
【发布时间】:2017-10-29 17:22:02
【问题描述】:

我在 Cassandra 表中分片了一个大型视频文件。我正在尝试通过使用Source 流将其流式传输回 API 客户端。

我的服务代码如下所示,

def getShards(id: String, shards: Int) = {
  def getShardsInternal(shardNo: Int, shards: Future[Array[Byte]]): Future[Array[Byte]] = {
    if (shardNo == 0) shards
    else getShardsInternal(shardNo - 1, shards.flatMap(x => Database.ShardModel.find(id, shardNo)))
  }
  getShardsInternal(shards, Future.successful(Array()))
}

在我的 AKKA HTTP 路由中,我尝试从返回的未来构建一个Source,如下所示,

def getAsset = get {
  pathPrefix("asset") {
    parameters('id) { id =>
      complete {
        val f = mediaService.getMetadata(id).flatMap { x =>
          mediaService.getShards(id, x.shards)
        }
        Source.fromFuture(f)
      }
    }
  }
}

我不确定Source.fromFuture 是如何做出响应的。被传递的未来本质上是一系列平面映射的未来,预计将按顺序执行。但是,我不相信这会以分块字节流的形式返回给客户端。

对此的任何指示将不胜感激。

编辑 1 我一直在尝试通过以下方式进一步缩小范围,

get {
  pathPrefix("asset") {
    parameters('id) { id =>
      complete {
        Source.fromFuture {
          Future.successful("Hello".getBytes()).flatMap(x => Future.successful("World".getBytes()))
        }
      }
    }
  }
}

我期待这会返回

[72,101,108,108,111,32,87,111,114,108,100]

但是,我只得到最后一个未来的结果,如下所示,

[[87,111,114,108,100]]

亲切的问候 梅拉杰

【问题讨论】:

    标签: scala akka akka-stream akka-http


    【解决方案1】:

    将您的Source[Array[Byte], NotUsed] 转换为Source[ByteString, NotUsed],并将HttpEntityContentTypes 一起使用:

    import akka.util.ByteString
    
    def getAsset = get {
      pathPrefix("asset") {
        parameters('id) { id =>
          val f = mediaService.getMetadata(id).flatMap { x =>
            mediaService.getShards(id, x.shards)
          }
          val source = Source.fromFuture(f).map(ByteString.apply)
          complete(HttpEntity(ContentTypes.`application/octet-stream`, source))
        }
      }
    }
    

    这里我以application/octet-stream 为例。由于您正在流式传输视频,因此您可能需要使用 ContentType.Binary 和适当的 media type。例如:

    complete(HttpEntity(ContentType.Binary(MediaTypes.`video/mpeg`), source))    
    

    针对您的评论和更新,您似乎希望在getShards 中连接期货的结果:正如您所发现的,flatMap 不会这样做。请改用Future.reduceLeft

    def getShards(id: String, shards: Int): Future[Array[Byte]] = {
      val futures = (1 to shards).map(Database.ShardModel.find(id, _))
      Future.reduceLeft(futures)(_ ++ _)
    }
    

    或者,您可以重新定义getShards 以返回Future[List[Array[Byte]]],然后使用flatMapConcat 创建Source,而不是将结果连接到单个数组中:

    def getShards(id: String, shards: Int): Future[List[Array[Byte]]] = {
      val futures = (1 to shards).map(Database.ShardModel.find(id, _)).toList
      Future.sequence(futures)
    }
    
    def getAsset = get {
      pathPrefix("asset") {
        parameters('id) { id =>
          val f = mediaService.getMetadata(id).flatMap { x =>
            mediaService.getShards(id, x.shards)
          }
          val source =
            Source.fromFuture(f)
                  .flatMapConcat(Source.apply)
                  .map(ByteString.apply)
          complete(HttpEntity(/* a content type */, source))
        }
      }
    }
    

    【讨论】:

    • 谢谢,那好像只是把最后一个future的结果发送给客户端。
    • @Meeraj:已更新。
    • 非常感谢,这似乎已经成功了。关于 Future 的 BTW 方法似乎是减少而不是 reduceLeft。此外,reduce 操作 (++) 是否会变得惰性,或者整个视频的字节数组是否会在提交到响应之前在内存中实现。我想要实现的是将每个分片的字节数组分块到响应流中。我找到了下面的链接,
    • @Meeraj:又更新了。
    猜你喜欢
    • 2018-08-28
    • 2017-01-30
    • 2018-02-19
    • 2018-07-08
    • 2013-04-12
    • 2015-07-07
    • 2017-07-17
    • 2019-06-25
    • 1970-01-01
    相关资源
    最近更新 更多