【问题标题】:Mongodb change stream with pipeline using mongooseMongodb 使用 mongoose 使用管道更改流
【发布时间】:2020-06-04 19:46:05
【问题描述】:

我正在尝试使用更改流来监听我的 mongodb 集群中的更改;但是在学习了几个教程之后,我使用猫鼬的最终实现不起作用。如何使用当前的猫鼬连接来监听数据库的变化

猫鼬连接:

mongoose
.connect(db, {
    useNewUrlParser: true,
    useFindAndModify: false,
    useUnifiedTopology: true
    // useCreateIndex: true
})
.then(() => {
    console.log("Connected to MongoDB...");
})
.catch(err => {
    console.log(err);
});

更改流:

const pipeline = { 
  $match: {
    $or: [{ operationType: 'insert' },{ operationType: 'update' }], 
    'fullDocument.institution': uniId 
  } 
};

const changeStream = Post.watch([pipeline], {fullDocument: 'updateLookup'});

changeStream.on("change", next => {
        switch(next.operationType) {
          case 'insert':
            console.log('an insert happened...', "uni_ID: ", next.fullDocument.institution);
            let rooms = Object.keys(socket.rooms);
            console.log("rooms: ", rooms);

            nmsps.emit('insert', {
              type: 'insert',
              msg: 'New question available',
              newPost: next.fullDocument
            });
            break;

          case 'update':
            console.log('an update happened...');

            nmsps.emit('update', {
              type: 'update',
              postId: next.documentKey._id,
              updateInfo: next.updateDescription.updatedFields,
              msg: "Question has been updated."
            });
            break;

          case 'delete':
            console.log('a delete happened...');

            nmsps.emit('delete', {
              type: 'delete',
              deletedId: next.documentKey._id,
              msg: 'Question has been deleted.'
            });
            break;

          default:
            break;
       }
 })

【问题讨论】:

    标签: node.js mongodb mongoose changestream


    【解决方案1】:

    由于 mongoose 使用 mongodb driver 作为核心模块,所以可以使用 mongodb 客户端来观察变化流。

    连接后:

    const client = mongoose.connection.client;
    const db = client.db('dbName');
    const collection = db.collection('collectionName');
    const changeStream = collection.watch();
    changeStream.on('change', next => {
        
    });

    【讨论】:

      【解决方案2】:

      他是changeStream的工作解决方案代码

      const mongoose = require('mongoose')
      const { connection } = require('../boot/mongo')
      
      const Schema = mongoose.Schema
      
      const status = new Schema({
          _id: {
              type: mongoose.Schema.Types.Number,
              required: true
          },
          receiveTs: {
              type: mongoose.Schema.Types.Date,
              required: true
          }
      })
      
      const OnlineStatusSchema = connection.model('Status', Status, 'status')
      
      const pipeline = [
          {
              $match: {
                  $or: [{ operationType: 'insert' }, { operationType: 'update' }]
              }
          },
          { $project: { 'fullDocument._id': 1, 'fullDocument.receiveTs': 1 } }
      ]
      
      
      
      const changeStream = OnlineStatusSchema.watch(pipeline)
      
      changeStream.on('change', async (change) => {
          // get meters reading log for respective platfrom and date
          try {
              console.log(change)
          } catch (error) {
              throw error
          }
      })
      
      module.exports = OnlineStatusSchema
      

      【讨论】:

      • 关闭更改流部分在哪里?请添加它
      猜你喜欢
      • 1970-01-01
      • 2014-01-06
      • 2022-01-09
      • 1970-01-01
      • 2017-04-15
      • 1970-01-01
      • 2020-06-09
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多