【问题标题】:How to use asynchronous drivers in akka-streams map vs mapAsync如何在 akka-streams map 和 mapAsync 中使用异步驱动程序
【发布时间】:2017-04-05 22:46:25
【问题描述】:

我刚刚开始使用reactivecouchbase 异步数据库驱动程序,但遇到了一些基本的设计问题。 在传统的方法中,我会通过限制与数据库的连接数来限制我对数据库施加的压力。但是,使用异步驱动程序,我可以用新的查询淹没数据库吗?

这变得很重要的一个例子如下。

假设我有两种不同的方式调用数据库。

我的函数调用 DB:

asyncCallDB: Future[DBResponse]
blockingCallDB: DBResponse

现在我想将 db 调用映射到可以使用两个不同函数的流上:

Flow.map()
Flow.mapAsync(numberOfConcurrentCalls)()

现在我的问题是您将如何选择调用数据库:

Flow.map(blockingCallDB) //One call at a time with back preassure
Flow.map(asyncCallDB) //Unlimited calls floods db no back pressure?

Flow.mapAsync(numberOfConcurrentCalls)(blockingCallDB) //Up to numberOfConcurrentCalls at the same time with back pressure
Flow.mapAsync(numberOfConcurrentCalls)(asyncCallDB) //Unlimited calls floods db no back pressure?

我觉得我在这里缺乏理解,想理解这种决定。

【问题讨论】:

标签: scala couchbase akka-stream


【解决方案1】:

ReactiveCouchbase 使用AsyncHttpClient 与 Couchbase 服务器进行通信。正如你可以see in the source code 它调用setMaximumConnectionsTotal,它限制了并发连接的数量。实际值取决于您在couchbase.http.maxTotalConnections 中配置的内容。

您创建的每个CouchbaseBucket 都有一个AsyncHttpClient。所以每个CouchbaseBucket最多有maxTotalConnections个连接。

来自Couchbase documentation on N1QL REST API

REST API 同步运行,因此一旦执行 请求开始,结果流回客户端, 语句执行完成时终止。

因此,实际上,每个存储桶的并发查询数限制为 maxTotalConnections

因此,DB 的背压总是以某种方式受到限制。要么是因为您将 maxTotalConnections 设置为非负数,要么是因为 RAM 或文件描述符数量有限,您的客户端无法创建更多连接。

但是,仍然有可能创建过多的Futures,这样您的客户端就会耗尽内存。每当您认为可能是这种情况时,您可能应该使用mapAsync,如in this answer 所述,或"Buffers and working with rate" (Akka documentation) 中提到的其他技术之一。

有一个good description of mapAsync in the Akka documentation

将传入元素传递给返回 Future 结果的函数。 什么时候 未来到达结果传递到下游。最多可以同时处理 n 个元素...

请记住,Flow.mapAsync 本身不会运行任何东西,它只会返回一个 Flow,您必须在 SourceSink 之间连接它,然后是 runAkka Quick Start Guide 以非常通俗易懂的方式描述了这一点。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2016-06-21
  • 1970-01-01
  • 2016-04-17
  • 2021-09-08
  • 1970-01-01
  • 2016-05-10
  • 1970-01-01
  • 2018-04-28
相关资源
最近更新 更多