【问题标题】:How to handle async concurrent requests correctly?如何正确处理异步并发请求?
【发布时间】:2014-08-01 11:38:56
【问题描述】:

假设我有某种游戏。我有一个这样的 buyItem 函数:

buyItem: function (req, res) {
    // query the users balance
    // deduct user balance
    // buy the item
}

如果我向该路由发送垃圾邮件直到扣除用户余额(第二次查询),则用户的余额仍然是正数。

我尝试过的:

buyItem: function (req, res) {
    if(req.session.user.busy) return false;
    req.session.user.busy = true;
    // query the users balance
    // deduct user balance
    // buy the item
}

问题是req.session.user.busy 将是undefined 对于前〜5 个请求。所以这也行不通。

我们如何处理这种情况?如果这很重要,我正在使用 Sails.JS 框架。

【问题讨论】:

  • 你不能直接在模型上调用.update() 跳过余额查询吗?
  • 你给了我一个想法。你的意思是在数据库中设置余额必须是正数,然后运行update 查询,对吧?如果成功(所以余额仍然是正数)我们可以记入该项目吗?
  • 是的,只需设置更新条件,使其仅限于余额大于或等于商品价格的用户。如果查询成功,则将该项目添加到他们的列表/库存/随便什么。

标签: node.js express concurrency sails.js


【解决方案1】:

更新 2

Sails 1.0 现在通过.getDatastore() 方法拥有完整的transaction support。示例:

// Get a reference to the default datastore, and start a transaction.
await sails.getDatastore().transaction(async (db, proceed)=> {
  // Now that we have a connection instance in `db`, pass it to Waterline
  // methods using `.usingConnection()` to make them part of the transaction:
  await BankAccount.update({ balance: 5000 }).usingConnection(db);
  // If an error is thrown, the transaction will be rolled back.
  // Or, you can catch errors yourself and call `proceed(err)`.
  // To commit the transaction, call `proceed()`
  return proceed();
  // You can also return a result with `proceed(null, result)`.
});

更新

正如一些评论者所指出的,启用connection pooling 时,下面的代码不起作用。在最初发布此内容时,默认情况下并非所有适配器都池化,但此时应该假设它们这样做,以便每个单独的方法调用(.query().findOne() 等)都可以在不同的连接上,并在事务之外运行。 Waterline 的下一个主要版本将支持事务,但在那之前,确保您的查询是事务性的唯一方法是使用原始数据库驱动程序包(例如 pgmysql)。

听起来您需要的是transaction。 Sails 尚不支持框架级别的事务(它在路线图上),但如果您使用支持它们的数据库(如 Postgres 或 MySQL),则可以使用模型的 .query() 方法访问底层适配器并运行native commands。这是一个例子:

buyItem: function(req, res) {
  try {
    // Start the transaction
    User.query("BEGIN", function(err) {
      if (err) {throw new Error(err);}
      // Find the user
      User.findOne(req.param("userId").exec(function(err, user) {
        if (err) {throw new Error(err);}
        // Update the user balance
        user.balance = user.balance - req.param("itemCost");
        // Save the user
        user.save(function(err) {
          if (err) {throw new Error(err);}
          // Commit the transaction
          User.query("COMMIT", function(err) {
            if (err) {throw new Error(err);}
            // Display the updated user
            res.json(user);
          });
        });
      });
    });
  } 
  // If there are any problems, roll back the transaction
  catch(e) {
    User.query("ROLLBACK", function(err) {
      // The rollback failed--Catastrophic error!
      if (err) {return res.serverError(err);}
      // Return the error that resulted in the rollback
      return res.serverError(e);
    });
  }
}

【讨论】:

  • 感谢您提供示例。这实际上非常有帮助。实施起来似乎并不难。我会将此标记为已接受的答案。
  • @sgress454 这在使用连接池时有效吗? (我相信默认启用)
  • @chadMcElligott 有一个很好的观点。如果启用了连接池,则不能保证您在每个 User.query 调用中获得相同的连接。如果您希望交易正常工作,这一点很重要
  • @ChadMcElligott,RussellSantos 是的,你们完全正确。在我写这篇文章的时候,默认情况下适配器并没有全部池化。相应地更新我的答案。
  • @sgress454 要清楚,代码 sn-p 是否已更新或仅添加了警告?谢谢!
【解决方案2】:

我还没有测试过。但只要您不使用多个实例或集群,您应该就可以将状态存储在内存中。因为节点是单线程的,所以原子性应该没有任何问题。

var inProgress = {};

function buyItem(req, res) {
    if (inProgress[req.session.user.id]) {
        // send error response
        return;
    }

    inProgress[req.session.user.id] = true;

    // or whatever the function is..
    req.session.user.subtractBalance(10.00, function(err, success) {
        delete inProgress[req.session.user.id];

        // send success response
    });
}

【讨论】:

  • 这不起作用。当您在客户端(5 个 HTTP 请求)中执行循环时,第一个会立即得到响应,其余的会等待几秒钟,然后也变为 true,并且您得到 5 倍的减法余额......不是这样工作的。