【发布时间】:2019-07-07 05:25:29
【问题描述】:
我想做SELECT .. FOR UPDATE 来锁定表中的一行,以便它可以以原子方式更新,因为可能会发生相同类型的并发请求。
我一直在做一些测试,但不清楚FOR UPDATE 在pg-promise.task 中是否有效。我试图避免使用pg-promise.tx,因为这需要更多的逻辑和可能的递归,我想避免这两种情况,因为用例将是高吞吐量。
更新:
经过更多的研究和测试,我发现使用task 和SELECT .. FOR UPDATE 会给我带来意想不到的结果。下面的代码解释。
submitUserOnline:(pgdb, u_uuid, socketConnectionList, user_websock) =>{
return new Promise((resolve,reject) =>{
// pgdb.tx({mode} t => {
pgdb.task( t => {
return t.one('SELECT * FROM users WHERE user_uuid = $1', [u_uuid]
.then(user =>{
// nothing happens here right now, but may in future, including for completeness. May cancel request based off some comparisons.
return t.any('SELECT * FROM users_online WHERE user_uuid = $1 FOR UPDATE',[u_uuid]
.then(result =>{
if(result.length > 0){
console.error('duplicate connection found ' + user_websock.uuid);
if(socketConnectionList[result[0].web_sock_uuid] !== undefined){
console.error('drop connection' + result[0].web_sock_uuid);
socketConnectionList[result[0].web_sock_uuid].wsc.close(4020, 'USER_RECONN');
}
console.error('duplicate connection found - update next ' + user_websock.uuid);
return t.none('UPDATE users_online SET web_sock_uuid = $1 WHERE user_uuid = $2', [user_websock, u_uuid])
.then(res =>{
console.error('UPDATE res: ' + res);
})
.catch(err =>{
console.error('UPDATE err: ' + err);});
}else{
// not reached in test case
console.error('no duplicate found ' + user_websock.uuid);
return t.none(INSERT INTO users_online.....ect ect
}
});
})
.then(res =>{
console.error('>>>task/tx res: ' + res);
resolve({msg: "OK"});
})
.catch(err =>{
console.error('>>>task/tx err: ' + err);
if(err.code ==== '40001'){// recursion for when called as 'tx'
console.error('>>>task/tx err - call recurse');
module.exports.submitUserOnline(pgdb, u_uuid, socketConnectionList, user_websock)
.then(res =>{
console.error('>>>task/tx err - call recurse - res ' + res);
resolve({msg: "OK"});
})
.catch( err =>{
console.error('>>>task/tx err - call recurse - err: ' + err);
reject({msg:"FAILED"});
});
}
});
});
}
const mode = new TransactionMode({
tiLevel: isolationLevel.serializable,
readOnly: false,
deferrable: true
});
submitUserOnline 由 websocket 处理程序调用。在我的测试用例中,我有一个 10 元素数组(相同的 user_uuid),它在 for 循环中触发所有客户端连接。基本上它与服务器 websocket 建立连接,该套接字检查特定用户的 users_online 表,如果该用户已经在表中,则终止陈旧连接并更新表中的 web_sock_uuid,这就是出现问题(有时它工作有时不工作,一次运行 10 个并发连接显示问题)。当用户行web_sock_uuid 是UPDATE'ed 时,其他并发连接似乎在SELECT .. FOR UPDATE(SLFU) 上正确阻塞,当UPDATE 执行时,then() 并不总是在下一个SLFU 被释放之前运行。这似乎以待处理的SLFU 的形式呈现它自己,返回前一个UPDATE 之前行的旧web_sock_uuid。在一个实例中,连续 4 次返回相同的陈旧“web_sock_uuid”。
如果我从 task 方法切换到需要递归调用的 tx 方法,上面的代码会按预期工作,尽管它确实需要多次递归。
【问题讨论】:
-
这里的任何问题都应该包括您尝试过的内容以及遇到的问题。同时,方法 task 和 tx 并不关心你在里面执行什么样的查询。
-
@vitaly-t : 很高兴看到这引起了你的注意,请参阅更新的 OP。
-
您应该使用pg-monitor 为您提供有关正在执行的连接和查询的清晰图片。可能那时你会自己发现一个问题;)
标签: node.js postgresql-9.5 pg-promise