【发布时间】:2022-01-15 19:02:00
【问题描述】:
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri}
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import spray.json._
import java.util.UUID
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.{Failure, Success}
object SoftwareRegistry extends App with Formatter {
implicit val system = ActorSystem("NPMRegistry")
implicit val materializer = ActorMaterializer()
case class NPMPackage(name: String)
// reading the packages
val filename = "B:\\Scala\\NPMRegistry\\src\\main\\resources\\packages.txt"
val bufferedSource = scala.io.Source.fromFile(filename)
val listOfPackages: List[NPMPackage] = (for (line <- bufferedSource.getLines) yield {
NPMPackage(line.trim)
}).toList
bufferedSource.close()
// requests
val serverHttpRequests = listOfPackages.map(pkg =>
(HttpRequest(
HttpMethods.GET,
uri = Uri(s"/${pkg.name}")
),
UUID.randomUUID().toString)
)
// source
val sourceList = Source(serverHttpRequests)
val bufferedFlow = Flow[(HttpRequest, String)]
.buffer(10, overflowStrategy = OverflowStrategy.backpressure)
.throttle(1, 3 seconds)
val dd = sourceList
.via(bufferedFlow).async
.via(Http().cachedHostConnectionPoolHttps[String]("registry.npmjs.org"))
.runForeach {
case (Success(response), oId) =>
println(s"$oId $response")
case (Failure(ex), oId) => println(ex)
}
在上面的代码中,我可以将响应打印到控制台,我想知道如何使用实体并以流的方式访问响应中的数据,而不是将来。
【问题讨论】:
-
“流式传输方式”到底是什么意思?您想如何使用收到的 HTTP 响应?
-
@Ava 我想在收到响应时使用它们。我之前尝试了另一种方法;它给出了未来的所有响应,我可以在未来完成后使用它们。这里我要一一回复
标签: json scala akka akka-stream akka-http