这可能不是发布此内容的最佳位置,但我认为无论如何都值得发布。
每次针对数据库连接定义/创建模型时,我都会调用model.syncIndexes(),这可以确保索引是最新的并且与架构保持同步,但是因为它已在线突出显示(example),这可能会在分布式架构中产生问题,其中多个服务器同时尝试相同的操作。如果使用类似 cluster 库的东西在同一台机器上的多个内核上生成主/从实例,这一点尤其重要,因为当整个服务器启动时,它们通常会在彼此靠近的地方启动。
参考上面的“codebarbarian”文章,当他们声明时清楚地强调了这个问题:
Mongoose 不会为您调用 syncIndexes(),您需要负责
自己调用syncIndexes()。有几个原因,
最值得注意的是 syncIndexes() 不做任何类型的分布式
锁定。如果您有多个服务器调用 syncIndexes() 时
他们开始,你可能会因为试图删除一个索引而出错
已经不存在了。
所以我要做的是创建一个函数,它使用 redis 和 redis redlock 来获得一段时间的租约,以防止多个工作人员(实际上是多个服务器中的多个工作人员)同时尝试相同的同步操作.
它也绕过了整个事情,除非它是试图执行操作的“主人”,我认为将这项工作委派给任何工人没有任何实际意义。
const cluster = require('cluster');
const {logger} = require("$/src/logger");
const {
redlock,
LockError
} = require("$/src/services/redis");
const mongoose = require('mongoose');
// Check is mongoose model,
// ref: https://stackoverflow.com/a/56815793/1834057
const isMongoModel = (obj) => {
return obj.hasOwnProperty('schema') && obj.schema instanceof mongoose.Schema;
}
const syncIndexesWithRedlock = (model,duration=60000) => new Promise(resolve => {
// Ensure the cluster is master
if(!cluster.isMaster)
return resolve(false)
// Now attempt to gain redlock and sync indexes
try {
// Typecheck
if(!model || !isMongoModel(model))
throw new Error('model argument is required and must be a mongoose model');
if(isNaN(duration) || duration <= 0)
throw new Error('duration argument is required, and must be positive numeric')
// Extract name
let name = model.collection.collectionName;
// Define the redlock resource
let resource = `syncIndexes/${name}`;
// Coerce Duration to Integer
// Not sure if this is strictly required, but wtf.
// Will ensure the duration is at least 1ms, given that duration <= 0 throws error above
let redlockLeaseDuration = Math.ceil(duration);
// Attempt to gain lock and sync indexes
redlock.lock(resource,redlockLeaseDuration)
.then(() => {
// Sync Indexes
model.syncIndexes();
// Success
resolve(true);
})
.catch(err => {
// Report Lock Error
if(err instanceof LockError){
logger.error(`Redlock LockError -- ${err.message}`);
// Report Other Errors
}else{
logger.error(err.message);
}
// Fail, Either LockError error or some other error
return resolve(false);
})
// General Fail for whatever reason
}catch(err){
logger.error(err.message);
return resolve(false);
}
});
我不会设置 Redis 连接,这是其他线程的主题,但上面这段代码的重点是展示如何可靠地使用 syncIndexes() 并防止一个线程删除索引而另一个线程出现问题尝试删除相同的索引,或尝试同时修改索引的其他分布式问题。