【问题标题】:How to chain Futures for async database I/O handling?如何链接 Futures 以进行异步数据库 I/O 处理?
【发布时间】:2015-03-18 01:43:43
【问题描述】:

我最近开始在 Play Scala 中开发应用程序。虽然我已经在多个应用程序中使用过 Play Java,但我对 Scala 和 Play Scala 还是很陌生。

我使用 DAO 模式来抽象数据库交互。 DAO 包含用于插入、更新删除的方法。在阅读了异步和线程池相关文档后,我认为使数据库交互异步非常重要,除非您将 Play 默认线程池调整为拥有多个线程。

为了确保所有数据库调用都是异步处理的,我进行了所有调用以返回 Future 而不是直接返回值。我为数据库交互创建了一个单独的执行上下文。

trait Dao[K, V] {
  def findById(id: K): Future[Option[V]]
  def update(v: V): Future[Boolean]
  [...]
}

这导致动作中的代码非常复杂且深度嵌套。

trait UserDao extends Dao[Long, User] {
  def existsWithEmail(email: String): Future[Boolean]
  def insert(u: User) Future[Boolean]
}

object UserController extends Controller {
  def register = Action {
    [...]
    userDao.existsWithEmail(email).flatMap { exists =>
      exits match {

        case true =>
          userDao.insert(new User("foo", "bar")).map { created =>
            created match {
              case true => Ok("Created!")
              case false => BadRequest("Failed creation")
            }
          }

        case false =>
          Future(BadRequest("User exists with same email"))
      }
    }
  }
}

以上是最简单的操作示例。随着我涉及更多的数据库调用,嵌套级别变得更深。虽然我认为使用 for 理解可以减少一些嵌套,但我怀疑我的方法本身是否从根本上是错误的?

考虑我需要创建用户的情况,

一个。如果没有相同的电子邮件地址已经存在。

b.如果没有相同的手机号码已经存在。

我可以创造两个未来,

f(a) 通过电子邮件检查用户是否存在。

f(b) 检查用户是否使用移动设备。

除非我验证这两个条件都为 false,否则我无法插入新用户。我实际上可以让 f(a) 和 f(b) 并行运行。如果 f(a) 评估为真,则并行执行可能是不可取的,否则可能会有利。创建用户的第 3 步取决于这两个未来,所以我想知道跟随是否同样好?

trait UserDao extends Dao[Long, User] {
  def existsWithEmail(email: String): Boolean
  def existsWithMobile(mobile: String): Boolean
  def insert(u: User): Unit
}

def register = Action {
  implicit val dbExecutionContext = myconcurrent.Context.dbExceutionContext

  Future {
    if (!userDao.existsWithEmail(email) && !userDao.existsWithMobile(mobile) {
      userDao.insert(new User("foo", "bar")
      Ok("Created!")
    } else {
      BadRequest("Already exists!")
    }
  }
}

哪种方法更好?使用单个 Future 多次调用数据库的方法有什么缺点吗?

【问题讨论】:

  • 您可以使用for/yield 糖更轻松地处理返回期货的连续调用。此外,当您的整个地图功能是模式匹配时,您可以直接放入案例,即myFuture.flatMap { case ... }。像traverse(来自scalaz)这样的高阶函数也可以派上用场。我不知道 Play 是否与 Spray 的 onComplete 有任何类比(这使得在该框架中使用 Futures 非常容易)。对数据库进行多次调用的单个 future 可能很好,但往往会导致将业务逻辑放在更难测试的 dao 中。
  • 坦率地说,在您确定数据库访问是您的瓶颈之前,我不会为此烦恼。
  • @lmm 我不是说把业务逻辑放到Dao里。我的意思是在动作中创建一个 Future,然后在其中嵌入多个调用。 for / yield 当调用不相互依赖时才有意义。没有?
  • for/yield 当调用相互依赖时更有意义,IMO,尽管它在任何一种情况下都有效。如果您将 Future 创建放在 Actions 中,那么这可能比将其放在 dao 中更重复,因为您可能会有许多不同的 Actions 调用较少数量的常见 dao 操作?但是,如果您不介意代码开销,那应该没问题。
  • @Ryan 我的虚拟服务器实例只有 1 个核心(不想在前期花费太多),这意味着除非我将线程池调整为像传统的那样(数十个线程),否则我只能一个线程。我的想法是在框架提供异步功能时使用它。如果我无法弄清楚,同步返回是我的后备。

标签: scala playframework-2.0


【解决方案1】:

你说的对,for 推导式可以减少嵌套。

要解决双重未来问题,请考虑:

existsWithEmail(email).zip(existsWithMobile(mobile)) map {
  case (false, false) => // create user
  case _              => // already exists
}

如果你有很多这些,你可以使用Future.sequence( Seq(future1, future2, ...) )将一个future序列变成一个future序列。

您可能想查看比 DAO 更多的用于 DB 访问的函数式惯用语,例如 SlickAnorm。通常,它们会比 DAO 更好地组合并最终变得更灵活。

附注:使用if/else 进行简单的真/假测试比使用match/case 更有效,并且是首选样式。

【讨论】:

  • 感谢您的建议。 zipFuture.sequence 确实很有趣。我实际上正在使用 Anorm,并且我的 DAO 映射到 Anorm 对应项。 DAO 的存在只是为了模拟测试。
【解决方案2】:

我在 scala 中使用 for 理解解决了这个问题。我添加了一些隐式类型转换器来帮助处理错误。

最初我做了类似的事情,

def someAction = Action.async {
  val result = 
    for {
      student <- studentDao.findById(studentId)
      if (student.isDefined)
      parent <- parentDao.findById(student.get.parentId)
      if (parent.isDefined)
      address <- addressDao.findById(parent.get.addressId)
      if (address.isDefined)
    } yield {
      // business logic
    }

  result fallbackTo Future.successful(BadRequest("Something went wrong"))
}

这就是代码最初是如何构建来对抗期货之间的依赖关系的。请注意,每个后续未来都依赖于前一个未来。此外,每个findById 都返回一个Future[Option[T]],因此if 内的for 理解需要处理方法返回None 的情况。如果任何期货评估为None,我在Future 上使用fallbackTo 方法回退到BadRequest 结果(如果任何if 条件在理解范围内失败,它会返回失败的未来)上面的方法是它会抑制遇到的任何类型的异常(即使是像 NPE 这样微不足道的异常),而是简单地回退到 BadRequest,这非常糟糕。

上述方法能够对抗期权的未来并处理失败的情况,尽管准确确定 for 理解中的哪个未来失败并没有帮助。为了克服这个限制,我使用了隐式类型转换器。

object FutureUtils {
  class FutureProcessingException(msg: String) extends Exception(msg)
  class MissingOptionValueException(msg: String) extends FutureProcessingException(msg)

  protected final class OptionFutureToOptionValueFuture[T](f: Future[Option[T]]) {
    def whenUndefined(error: String)(implicit context: ExecutionContext): Future[T] = {
      f.map { value =>
        if (value.isDefined) value.get else throw new MissingOptionValueException(error)
      }
    }
  }

  import scala.language.implicitConversions

  implicit def optionFutureToValueFutureConverter[T](f: Future[Option[T]]) = new OptionFutureToOptionValueFuture(f)

}

上面的隐式转换让我可以为链接多个未来的理解编写可读的代码。

import FutureUtils._

def someAction = Action.async {
  val result = 
    for {
      student <- studentDao.findById(studentId) whenUndefined "Invalid student id"
      parent <- parentDao.findById(student.get.parentId) whenUndefined "Invalid parent id"
      address <- addressDao.findById(parent.get.addressId) whenUndefined "Invalid address id"
    } yield {
      // business logic
    }

  result.recover {
    case fpe: FutureProcessingException => BadRequest(fpe.getMessage)
    case t: Throwable => InternalServerError
  } 
}

上述方法确保所有由缺少 Option 值引起的异常都作为BadRequest 处理,并带有关于究竟失败的特定消息。所有其他失败都被视为InternalServerError。您可以使用堆栈跟踪记录确切的异常以帮助调试。

【讨论】:

    猜你喜欢
    • 2016-01-06
    • 1970-01-01
    • 2015-09-01
    • 1970-01-01
    • 2013-02-21
    • 2013-01-07
    • 1970-01-01
    • 2013-06-19
    • 1970-01-01
    相关资源
    最近更新 更多