【发布时间】:2022-03-08 15:51:52
【问题描述】:
我是 akka 的新手,但仍在尝试了解不同的 akka 和流式传输概念。对于一些新功能,我需要向正在处理内部对象的现有流添加 http 调用。像这样 -
val step1Flow = Flow[SampleObject].filter(...--Filtering condition--...)
val step2Flow = Flow[SampleObject].map(obj => {
...
-- Business logic to update values in the obj --
...
})
...
override val flowGraph: Flow[SampleObject, SampleObject, NotUsed] =
bufferIn.via(Flow.fromGraph(GraphDSL.create() {
implicit builder =>
import GraphDSL.Implicits._
...
val step1 = builder.add(step1Flow)
val step2 = builder.add(step2Flow)
val step3 = builder.add(step3Flow)
...
source ~> step1 ~> step2 ~> step3 ~> merge
...
}
我需要在 step1 之后添加新的 http 请求流(我们称之为 newFlow)。所有这些流都有 Inlet 和 Outlet 作为 SampleObject。现在我的理解是 newFlow 需要阻塞,因为出口只需是 SampleObject 。为此,我在 http 调用 future 上使用了 Await 函数。代码看起来像这样 -
val responseFuture: Future[(Try[HttpResponse], SomeContext)] =
Source
.single(httpRequest -> context)
.via(Retry(retrySettings).join(clientFlow))
.runWith(Sink.head)
...
val (httpTry, passedAlongContext) = Await.result(responseFuture, 30.seconds)
-- logic to process response and return SampleObject --
现在这工作正常,但我认为应该有更好的方法来做到这一点,而无需使用等待。另外我认为这会阻塞主线程直到请求完成,这会影响应用程序的吞吐量。 您能否指导我使用的方法是否正确。以及如何使用其他线程池来处理这些阻塞调用,这样我的主线程池不受影响
这个问题似乎与我的非常相似,但我并不完全理解 - connect Akka HTTP to Akka stream。我也不能改变 step2 或进一步的流程。
编辑:为流添加了一些代码细节
【问题讨论】:
-
向我们展示现有流是如何组合在一起的。
-
您需要为这个问题发布更多代码才能回答最一般的问题。您如何发出 HTTP 请求(例如,哪个库)?
标签: http akka threadpool blocking