使用小鹿
这个问题是Fawn library 的用法之一,它来自对猫鼬模型的命名以及它们如何与库本身交互的一些误解。因此,最好的演示方式是使用工作代码的最小示例:
const { Schema } = mongoose = require('mongoose');
const Fawn = require('fawn');
const uri = 'mongodb://localhost:27017/fawndemo';
const opts = { useNewUrlParser: true };
// sensible defaults
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
mongoose.set('useFindAndModify', false);
mongoose.set('useCreateIndex', true);
// schema defs
const oneSchema = new Schema({
name: String
});
const twoSchema = new Schema({
counter: Number
});
// don't even need vars since we access model by name
mongoose.model('One', oneSchema);
mongoose.model('Two', twoSchema);
// log helper
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri, opts);
// init fawm
Fawn.init(mongoose);
// Clean models
await Promise.all(
Object.entries(conn.models).map(([k,m]) => m.deleteMany())
)
// run test
let task = Fawn.Task();
let results = await task
.save('One', { name: 'Bill' })
.save('Two', { counter: 0 })
.update('Two', { }, { "$inc": { "counter": 1 } })
.run({ useMongoose: true });
log(results);
// List objects in models
for ( [k,m] of Object.entries(conn.models) ) {
let result = await m.find();
log(result);
}
} catch(e) {
console.error(e)
} finally {
mongoose.disconnect()
}
})()
注意这里的猫鼬模型是如何注册的:
mongoose.model('One', oneSchema);
mongoose.model('Two', twoSchema);
第一个参数是 mongoose 在其内部逻辑中用于模型的注册名称。从 mongoose 本身的角度来看,一旦你像上面那样在 schema 中注册了模型名称,你就可以真正调用模型的一个实例,如下所示:
const One = mongoose.model('One');
通常人们export 初始注册的结果,然后只使用返回值,这是对 mongoose 自己的模型详细信息和附加模式的内部存储的引用。但只要注册码已经运行,这行代码就等价了。
考虑到这一点的典型exports 因此可以用作:
require('./models/one');
require('./models/two');
let results = await mongoose.model('One').find();
因此,您可能不会在其他代码示例中经常看到这一点,但这确实是为了从 Fawn 库的角度用后面的代码展示实际发生的事情。
有了这些知识,您可以考虑清单中的以下代码:
let task = Fawn.Task();
let results = await task
.save('One', { name: 'Bill' })
.save('Two', { counter: 0 })
.update('Two', { }, { "$inc": { "counter": 1 } })
.run({ useMongoose: true });
这里,猫鼬和 MongoDB 用户熟悉的 update() 和 save() 方法实际上在 Fawn.Task() 结果上具有特定于其实现的不同的第一个参数。第一个参数是 mongoose 的 “注册模型名称”,这就是我们刚刚在前面的示例中解释的内容。
Fawn 库实际上在做的是调用类似的代码:
mongoose.model('One').save({ name: 'Bill' })
实际上,它所做的事情比示例清单的输出中所证明的要复杂得多。它实际上做了很多与two phase commits 相关的其他事情,并在另一个集合中写入临时条目,并最终将它们移至目标集合。但是,当它确实进入已注册模型的集合时,它基本上就是这样做的。
所以问题代码中的核心问题是您没有使用实际注册到猫鼬模型的名称,并且文档步骤中缺少其他一些内容。
您也没有正确等待异步函数,并且问题代码中的 try..catch 在此上下文中没有对调用执行任何操作。然而,此处的清单演示了如何使用 async/await 正确执行此操作。
如果你的 NodeJS 版本不支持async/await,你也可以只使用原生的Promise.then(...).catch(...) 方法,但除了这样做之外几乎没有其他改变,当然删除try..catch,因为这种形式的承诺会忽略它。这就是为什么你改为catch()。
注意 - 通过一些简短的测试,似乎有很多东西是支持的 mongoose/mongodb 功能,但实际上并没有在这个库的方法上实现和支持。值得注意的是,“upserts”是一个有用且常见的例子,这里实现的“两阶段提交”系统似乎根本不支持。
这在某种程度上似乎是库代码中的疏忽,其中方法的某些“选项”实际上被忽略或完全剥离。这是充分利用 MongoDB 功能的一个问题。
交易
这个库的整个用法至少在我看来是可疑的,因为你“认为”这是“交易”,所以你选择了它。明确地说,两阶段提交不是一个事务。此外,在此类控制和回滚等方面的任何尝试的实施似乎充其量都是非常松散的。
如果您拥有现代 MongoDB 4.0 或更高版本的服务器,并且您实际将其配置为命名为“副本集”(您也可以为 single 成员执行此操作,其中一个常见的误解是你需要不止一个)然后有support for real transactions,它们很容易实现:
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost:27017/trandemo';
const opts = { useNewUrlParser: true };
// sensible defaults
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
mongoose.set('useFindAndModify', false);
mongoose.set('useCreateIndex', true);
// schema defs
const orderSchema = new Schema({
name: String
});
const orderItemsSchema = new Schema({
order: { type: Schema.Types.ObjectId, ref: 'Order' },
itemName: String,
price: Number
});
const Order = mongoose.model('Order', orderSchema);
const OrderItems = mongoose.model('OrderItems', orderItemsSchema);
// log helper
const log = data => console.log(JSON.stringify(data, undefined, 2));
// main
(async function() {
try {
const conn = await mongoose.connect(uri, opts);
// clean models
await Promise.all(
Object.entries(conn.models).map(([k,m]) => m.deleteMany())
)
let session = await conn.startSession();
session.startTransaction();
// Collections must exist in transactions
await Promise.all(
Object.entries(conn.models).map(([k,m]) => m.createCollection())
);
let [order] = await Order.create([{ name: 'Bill' }], { session });
let items = await OrderItems.insertMany(
[
{ order: order._id, itemName: 'Cheese', price: 1 },
{ order: order._id, itemName: 'Bread', price: 2 },
{ order: order._id, itemName: 'Milk', price: 3 }
],
{ session }
);
// update an item
let result1 = await OrderItems.updateOne(
{ order: order._id, itemName: 'Milk' },
{ $inc: { price: 1 } },
{ session }
);
log(result1);
// commit
await session.commitTransaction();
// start another
session.startTransaction();
// Update and abort
let result2 = await OrderItems.findOneAndUpdate(
{ order: order._id, itemName: 'Milk' },
{ $inc: { price: 1 } },
{ 'new': true, session }
);
log(result2);
await session.abortTransaction();
/*
* $lookup join - expect Milk to be price: 4
*
*/
let joined = await Order.aggregate([
{ '$match': { _id: order._id } },
{ '$lookup': {
'from': OrderItems.collection.name,
'foreignField': 'order',
'localField': '_id',
'as': 'orderitems'
}}
]);
log(joined);
} catch(e) {
console.error(e)
} finally {
mongoose.disconnect()
}
})()
这实际上只是一个包含 Order 类和相关 OrderItems 的简单列表。代码中确实没有什么特别之处,您应该看到它与您将看到的大多数清单示例基本相同,只是做了一些小改动。
值得注意的是,我们初始化了 session 和 session.startTransaction() 作为交易应该正在进行的指示器。请注意,session 通常具有更广泛的范围,您通常会将该对象重用于多个操作。
现在你有session 并且事务开始了,这只是添加到正在执行的各种语句的“选项”中:
let [order] = await Order.create([{ name: 'Bill' }], { session });
let items = await OrderItems.insertMany(
[
{ order: order._id, itemName: 'Cheese', price: 1 },
{ order: order._id, itemName: 'Bread', price: 2 },
{ order: order._id, itemName: 'Milk', price: 3 }
],
{ session }
);
诚然,这是一个简短的示例,并未完全涵盖所有写入错误的可能性以及如何在单独的 try..catch 块中处理它。但作为一个非常基本的示例,如果在调用 session.commitTransaction() 之前发生任何错误,那么自事务启动以来的所有操作都不会真正持久化在会话中。
还有一个“因果一致性”,一旦一个正常的写确认被确认,那么在session的范围内,数据似乎被写入了各自的集合直到事务提交或回滚。
如果发生回滚(如最终操作所示):
// Update and abort
let result2 = await OrderItems.findOneAndUpdate(
{ order: order._id, itemName: 'Milk' },
{ $inc: { price: 1 } },
{ 'new': true, session }
);
log(result2);
await session.abortTransaction();
这些写入虽然报告为在操作结果中看到的那样,但确实是“回滚”,并且进一步的操作会在进行这些更改之前看到数据的状态。
完整的示例代码通过在一个事务中添加具有另一个更新操作的项目来演示这一点,然后开始另一个以更改数据并读取数据,然后中止事务。最终的数据状态当然只显示实际提交的内容。
注意操作如find() 和findOne() 或任何检索数据的操作必须在事务处于活动状态时包含session 以便查看当前状态,就像清单中显示的写操作一样。
如果不包括session,这些状态更改在事务解决之前不会在“全局”范围内可见。
列出输出
给定的代码清单在运行时会产生以下输出,以供参考。
小鹿演示
Mongoose: ones.deleteMany({}, {})
Mongoose: twos.deleteMany({}, {})
Mongoose: ojlinttaskcollections.deleteMany({}, {})
Mongoose: ojlinttaskcollections.insertOne({ _id: ObjectId("5bf765f7e5c71c5fae77030a"), steps: [ { dataStore: [], _id: ObjectId("5bf765f7e5c71c5fae77030d"), index: 0, type: 'save', state: 0, name: 'One', data: { name: 'Bill' } }, { dataStore: [], _id: ObjectId("5bf765f7e5c71c5fae77030c"), index: 1, type: 'save', state: 0, name: 'Two', data: { counter: 0 } }, { dataStore: [], _id: ObjectId("5bf765f7e5c71c5fae77030b"), index: 2, type: 'update', state: 0, name: 'Two', data: { '*_**ojlint**escape$*__tx__00***___string$inc': { counter: 1 } } } ], __v: 0 })
Mongoose: ojlinttaskcollections.updateOne({ _id: ObjectId("5bf765f7e5c71c5fae77030a") }, { '$set': { 'steps.0.state': 1 } })
Mongoose: ones.insertOne({ _id: ObjectId("5bf765f7e5c71c5fae77030e"), name: 'Bill', __v: 0 })
Mongoose: ojlinttaskcollections.updateOne({ _id: ObjectId("5bf765f7e5c71c5fae77030a") }, { '$set': { 'steps.0.state': 2 } })
Mongoose: ojlinttaskcollections.updateOne({ _id: ObjectId("5bf765f7e5c71c5fae77030a") }, { '$set': { 'steps.1.state': 1 } })
Mongoose: twos.insertOne({ _id: ObjectId("5bf765f7e5c71c5fae77030f"), counter: 0, __v: 0 })
Mongoose: ojlinttaskcollections.updateOne({ _id: ObjectId("5bf765f7e5c71c5fae77030a") }, { '$set': { 'steps.1.state': 2 } })
Mongoose: twos.find({})
Mongoose: ojlinttaskcollections.updateOne({ _id: ObjectId("5bf765f7e5c71c5fae77030a") }, { '$set': { 'steps.2.state': 1 } })
Mongoose: twos.update({}, { '$inc': { counter: 1 } }, {})
(node:24494) DeprecationWarning: collection.update is deprecated. Use updateOne, updateMany, or bulkWrite instead.
Mongoose: ojlinttaskcollections.updateOne({ _id: ObjectId("5bf765f7e5c71c5fae77030a") }, { '$set': { 'steps.2.state': 2 } })
Mongoose: ojlinttaskcollections.deleteOne({ _id: ObjectId("5bf765f7e5c71c5fae77030a") })
[
{
"_id": "5bf765f7e5c71c5fae77030e",
"name": "Bill",
"__v": 0
},
{
"_id": "5bf765f7e5c71c5fae77030f",
"counter": 0,
"__v": 0
},
{
"n": 1,
"nModified": 1,
"opTime": {
"ts": "6626877488230301707",
"t": 139
},
"electionId": "7fffffff000000000000008b",
"ok": 1,
"operationTime": "6626877488230301707",
"$clusterTime": {
"clusterTime": "6626877488230301707",
"signature": {
"hash": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=",
"keyId": 0
}
}
}
]
Mongoose: ones.find({}, { projection: {} })
[
{
"_id": "5bf765f7e5c71c5fae77030e",
"name": "Bill",
"__v": 0
}
]
Mongoose: twos.find({}, { projection: {} })
[
{
"_id": "5bf765f7e5c71c5fae77030f",
"counter": 1,
"__v": 0
}
]
Mongoose: ojlinttaskcollections.find({}, { projection: {} })
[]
转换演示
Mongoose: orders.deleteMany({}, {})
Mongoose: orderitems.deleteMany({}, {})
Mongoose: orders.insertOne({ _id: ObjectId("5bf7661c3f60105fe48d076e"), name: 'Bill', __v: 0 }, { session: ClientSession("e146c6074bb046faa7b70ed787e1a334") })
Mongoose: orderitems.insertMany([ { _id: 5bf7661c3f60105fe48d076f, order: 5bf7661c3f60105fe48d076e, itemName: 'Cheese', price: 1, __v: 0 }, { _id: 5bf7661c3f60105fe48d0770, order: 5bf7661c3f60105fe48d076e, itemName: 'Bread', price: 2, __v: 0 }, { _id: 5bf7661c3f60105fe48d0771, order: 5bf7661c3f60105fe48d076e, itemName: 'Milk', price: 3, __v: 0 } ], { session: ClientSession("e146c6074bb046faa7b70ed787e1a334") })
Mongoose: orderitems.updateOne({ order: ObjectId("5bf7661c3f60105fe48d076e"), itemName: 'Milk' }, { '$inc': { price: 1 } }, { session: ClientSession("e146c6074bb046faa7b70ed787e1a334") })
{
"n": 1,
"nModified": 1,
"opTime": {
"ts": "6626877647144091652",
"t": 139
},
"electionId": "7fffffff000000000000008b",
"ok": 1,
"operationTime": "6626877647144091652",
"$clusterTime": {
"clusterTime": "6626877647144091652",
"signature": {
"hash": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=",
"keyId": 0
}
}
}
Mongoose: orderitems.findOneAndUpdate({ order: ObjectId("5bf7661c3f60105fe48d076e"), itemName: 'Milk' }, { '$inc': { price: 1 } }, { session: ClientSession("e146c6074bb046faa7b70ed787e1a334"), upsert: false, remove: false, projection: {}, returnOriginal: false })
{
"_id": "5bf7661c3f60105fe48d0771",
"order": "5bf7661c3f60105fe48d076e",
"itemName": "Milk",
"price": 5,
"__v": 0
}
Mongoose: orders.aggregate([ { '$match': { _id: 5bf7661c3f60105fe48d076e } }, { '$lookup': { from: 'orderitems', foreignField: 'order', localField: '_id', as: 'orderitems' } } ], {})
[
{
"_id": "5bf7661c3f60105fe48d076e",
"name": "Bill",
"__v": 0,
"orderitems": [
{
"_id": "5bf7661c3f60105fe48d076f",
"order": "5bf7661c3f60105fe48d076e",
"itemName": "Cheese",
"price": 1,
"__v": 0
},
{
"_id": "5bf7661c3f60105fe48d0770",
"order": "5bf7661c3f60105fe48d076e",
"itemName": "Bread",
"price": 2,
"__v": 0
},
{
"_id": "5bf7661c3f60105fe48d0771",
"order": "5bf7661c3f60105fe48d076e",
"itemName": "Milk",
"price": 4,
"__v": 0
}
]
}
]