【问题标题】:How to listen to change in specific field of a nested array in mongodb change streams?如何在 mongodb 更改流中监听嵌套数组的特定字段的更改?
【发布时间】:2026-02-10 22:50:01
【问题描述】:

这是我在 mongodb 中的 BSON 文档的结构。

{
    "tournament_id": "P1oi12mwj10b1b",
    "matches": [
        {
            "date_order": 1,
            "matches": [
                {
                    "match_id": "1A4i0sp34"
                    "time_order": 1,
                    "win": "team1",
                    "team1": "bar",
                    "team2": "psg"
                },
                {
                    "match_id": "3A4j0sp26"
                    "time_order": 2,
                    "win": "",
                    "team1": "rma",
                    "team2": "sev"
                }
            ]
        },
        {
            "date_order": 2,
            "matches": [
                {
                    "match_id": "2B4k0sp29"
                    "time_order": 1,
                    "win": "",
                    "team1": "manU",
                    "team2": "manC"
                },
                {
                    "match_id": "4A4i0sp31"
                    "time_order": 2,
                    "win": "",
                    "team1": "chelsea",
                    "team2": "arsenal"
                }
            ]
        }
    ]
}

我想制作一个通知系统,在比赛完成时发送通知。换句话说,每当win 字段的值发生变化时,我都想捕捉更新了哪个匹配项。我正在使用 mongodb 更改流。

例如,如果与 ma​​tch_id 3A4j0sp26 的匹配刚刚完成,我想打印该对象。

{
      "match_id": "3A4j0sp26"
      "time_order": 2,
      "win": "team2",
      "team1": "rma",
      "team2": "sev"
      # If possible I also want to find these fields,
      "tournament_id": "P1oi12mwj10b1b",
      "date_order": 1
}

我试过这样做。

import pymongo
from bson.json_util import dumps

MONGO_URI = 'mongodb://localhost/mydb'
client = pymongo.MongoClient(MONGO_URI)

filters = []  # How to correctly set this filter ???
'''
What I already tried but failed
filters = [{
        '$match': {
            '$and': [
                {'updateDescription.updatedFields.matches': {'$exists': 'true'}},  # This line needs fixing.
                {'operationType': {'$in': ['replace', 'update']}}
            ]
        }
    }]
'''


change_stream = client.mydb.match.watch(filters)
for change in change_streams:
    print(dumps(change))

我尝试在不应用过滤器的情况下进行调试。我将match_id 3A4j0sp26win 字段更新为team2。 我得到了这个结果。

{
  "_id": {
    "_data": "8261252C2F000000012B022C0100296E5A1004D4D1F2A9AF33491089DE8C2A51537EBB46645F6964006461228AE88CF6743D054B8CEF0004"
  },
  "operationType": "replace",
  "clusterTime": {
    "$timestamp": {
      "t": 1629826095,
      "i": 1
    }
  },
  "fullDocument": {
    "_id": {
      "$oid": "61228ae88cf6743d054b8cef"
    },
    "tournament_id": "P1oi12mwj10b1b",
    "matches": [
        {
            "date_order": 1,
            "matches": [
                {
                    "match_id": "1A4i0sp34"
                    "time_order": 1,
                    "win": "team1",    # This was updated earlier. I don't want this.
                    "team1": "bar",
                    "team2": "psg"
                },
                {
                    "match_id": "3A4j0sp26"
                    "time_order": 2,
                    "win": "team1",     # This is the most recently updated.
                    "team1": "rma",
                    "team2": "sev"
                }
            ]
        }]
   }
}

它显示了数组中的所有元素,而不是刚刚更新的元素。

已编辑

仅更新“分数”字段后得到的结果。

{
  "_id": {
    "_data": "8261254598000000022B022C0100296E5A1004D4D1F2A9AF33491089DE8C2A51537EBB46645F6964006461228AE88CF6743D054B8CEF0004"
  },
  "operationType": "update",
  "clusterTime": {
    "$timestamp": {
      "t": 1629832600,
      "i": 2
    }
  },
  "ns": {
    "db": "mydb",
    "coll": "match"
  },
  "documentKey": {
    "_id": {
      "$oid": "61228ae88cf6743d054b8cef"
    }
  },
  "updateDescription": {
    "updatedFields": {
       "matches": [
        {
            "date_order": 1,
            "matches": [
                {
                    "match_id": "1A4i0sp34"
                    "time_order": 1,
                    "win": "team1",
                    "team1": "bar",
                    "team2": "psg"
                },
                {
                    "match_id": "3A4j0sp26"
                    "time_order": 2,
                    "win": "team1",
                    "team1": "rma",
                    "team2": "sev"
                }
            ]
        }]
   },
     "removedFields": []
    }
  }
}

【问题讨论】:

  • 尝试运行未过滤的更改流,然后修改该字段,以查看更改事件的样子。
  • @Joe,我也附上了未过滤流的结果。请检查。它会打印整个数组,因此我们无法区分哪个是最近的更改。
  • 那是因为你做了完整的文档替换而不是更新字段,所以没有与之前的文档进行比较。如果您在更新时只修改了更改的字段,则更改流会告诉您哪些字段被修改了。
  • 我尝试只更新那个字段。 operationType 现在显示为 update。但是,它仍然在updatedFields 中显示整个数组,而不仅仅是更新的数组。用 mongodb 知道数组中的哪个元素被更新是不可能的吗?

标签: python mongodb pymongo changestream


【解决方案1】:

这取决于你如何进行更新。

一个简短的测试来证明:

插入文档并启动更改流

PRIMARY> db.updtest.insert({list:[
                         {item:"1",state:"running"},
                         {item:"2",state:"done"},
                         {item:"3",state:"unknown"}
                  ]});

WriteResult({ "nInserted" : 1 })

PRIMARY> let stream = db.updtest.watch()

通过设置列表字段进行更新会导致返回整个数组的更改事件:

PRIMARY> db.updtest.updateOne({},{$set:{list:[
                         {item:"1",state:"running"},
                         {item:"2",state:"done"},
                         {item:"3",state:"running"}
                  ]}});

{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }

PRIMARY> stream.next();

{
    "_id" : {
        "_data" : "82612577BE000000012B022C0100296E5A100436BFE3F91AF84C7CB04826F361BCE50346645F696400646125779D98787C286C5443050004"
    },
    "operationType" : "update",
    "clusterTime" : Timestamp(1629845438, 1),
    "ns" : {
        "db" : "test",
        "coll" : "updtest"
    },
    "documentKey" : {
        "_id" : ObjectId("6125779d98787c286c544305")
    },
    "updateDescription" : {
        "updatedFields" : {
            "list" : [
                {
                    "item" : "1",
                    "state" : "running"
                },
                {
                    "item" : "2",
                    "state" : "done"
                },
                {
                    "item" : "3",
                    "state" : "running"
                }
            ]
        },
        "removedFields" : [ ]
    }
}

仅更新一个子文档中的一个字段会导致更改事件仅包含修改后的字段:

PRIMARY> db.updtest.update({"list.item":"3"},{$set:{"list.$.state":"done"}});

WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

PRIMARY> stream.next();

{
    "_id" : {
        "_data" : "8261257879000000012B022C0100296E5A100436BFE3F91AF84C7CB04826F361BCE50346645F696400646125779D98787C286C5443050004"
    },
    "operationType" : "update",
    "clusterTime" : Timestamp(1629845625, 1),
    "ns" : {
        "db" : "test",
        "coll" : "updtest"
    },
    "documentKey" : {
        "_id" : ObjectId("6125779d98787c286c544305")
    },
    "updateDescription" : {
        "updatedFields" : {
            "list.2.state" : "done"
        },
        "removedFields" : [ ]
    }
}

如果您还使用更改流选项返回完整的文档,您将获得更改字段周围的上下文。

【讨论】: