签名
区别在signatures 中最突出:Flow.map 接受一个返回类型T 的函数,而Flow.mapAsync 接受一个返回类型Future[T] 的函数。
实例
例如,假设我们有一个函数,它根据用户 ID 查询数据库以获取用户的全名:
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
给定一个 UserID 的 akka 流 Source 值,我们可以在流中使用 Flow.map 来查询数据库并将全名打印到控制台:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
这种方法的一个限制是该流一次只能进行 1 个 db 查询。这种串行查询将成为“瓶颈”,并且可能会阻止我们的流中的最大吞吐量。
我们可以尝试通过使用Future 的并发查询来提高性能:
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
这个简单的附录的问题在于我们有效地消除了背压。
Sink 只是拉入 Future 并添加了一个foreach println,与数据库查询相比,速度相对较快。该流将不断地将需求传播到源并在Flow.map 内产生更多期货。因此,并发运行的databaseLookup 的数量没有限制。不受约束的并行查询最终可能会使数据库过载。
Flow.mapAsync 救援;我们可以同时访问数据库,同时限制同时查找的数量:
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
还要注意Sink.foreach 变得更简单了,它不再接受Future[FullName],而是只接受FullName。
无序异步映射
如果不需要将 UserID 保持到 FullName 的顺序,那么您可以使用 Flow.mapAsyncUnordered。例如:您只需将所有名称打印到控制台,而不关心它们的打印顺序。