【问题标题】:How to mark multiple MongoDB documents for processing?如何标记多个 MongoDB 文档进行处理?
【发布时间】:2019-10-25 15:29:25
【问题描述】:

我正在实现一个用 Node 编写的网络爬虫,并使用 MongoDB 作为我的应用程序的后端来存储页面及其状态。 Crawler 应该能够在多台机器上运行,除此之外,每台机器都会有多个 worker 并行运行,以加快挂起页面的爬取过程。

每个工人都会:

  1. 查询数据库以获取一些仍有待抓取的页面
  2. 将其状态从“待处理”更新为“进行中”
  3. 抓取它们
  4. 将其状态从“进行中”更新为“已完成”

考虑到这一点,我试图找到让多个工作人员同时查询相同页面的方法。

每个工人都有自己的唯一 ID,因此页面只是具有如下结构的文档:

{ uri, status, workerId, <other data> }

我的计划是用当前工作人员 ID 标记 N 文档(通知它们将由该工作人员处理),然后查询它们

类似set workerId to &lt;currentWorkerId&gt; 的文件有:{ "status": "Pending", "workerId": null }

然后查询文件有:{ "status": "Pending", "workerId": "&lt;currentWorkerId&gt;" }

问题是,据我所知,mongo 不支持有限制的更新。当然我可以执行N更新操作更新单个文档,但我想知道这种任务是否有更惯用/优雅的解决方案?

最后,我的目标是确保每当 2 个或更多工作人员查询要处理的页面时,他们不会两次检索相同的页面。

【问题讨论】:

  • 你运行的是什么版本的 MongoDB,你是分片的吗?
  • 这不只是一个关于使用查询更新多个文档的问题吗? stackoverflow.com/questions/1740023/…
  • @RobertMoskal - 是的,但我认为更具体地说,如何构建更新语句的查找部分,以便记录的选择是伪随机的,以便均匀分布。

标签: node.js mongodb


【解决方案1】:

好吧,我想我理解目标 - 您希望更新所有具有待处理状态的文档并为它们分配一个工作人员。您希望在一定程度上平均分配工人。完成工作人员分配后,每个工作人员将确定要扫描的页面。但是您不喜欢一次遍历一个文档的游标,而是希望一次更新一组数据。

这是一个在 updateMany() 函数中使用 $where 条件的示例。请记住 $where 不能使用索引。如果您对“状态”进行索引,您可能还可以,但从性能角度来看,这可能行不通。我的信念是您希望更新所有待处理的记录,因此与一次更新一条记录相比,这种方式对性能的影响可能会更好。此外,我的查询谓词不考虑 workerId 是否为空。这是因为我认为永远不应该出现状态为“待定”且 workerId 不为空的情况。

假设有两个工人,我的想法实现了两个更新语句,一个用于worker0,另一个用于worker1。我假设您的文档有一个名为 _id 的字段,它是一个 ObjectId。策略是使用 _id 字段时间戳。查看时间戳的秒数。对于秒值在 0 到 30 之间的那些分配给 worker0,所有其他分配给 worker1。如果您有更多工人,则需要更改此策略以适应所需工人的数量。

worker0 分配:

db.pages.updateMany({"status": "Pending", $where: function(){
        var seconds = this._id.getTimestamp().getSeconds()
        if(seconds >= 0 && seconds < 30) {
            return true;
        }
        else {
            return false;
        }
    }
}, { $set: { status: "In Progress", workerId: 0} })

worker1 分配:

db.pages.updateMany({"status": "Pending", $where: function(){
        var seconds = this._id.getTimestamp().getSeconds()
        if(seconds >= 30) {
            return true;
        }
        else {
            return false;
        }
    }
}, { $set: { status: "In Progress", workerId: 1} })

一旦运行这些查询,分配就完成了。现在,每个工作人员都可以通过发出各自的查询来确定要抓取哪些页面。例如:

Worker0 识别要抓取的页面:

db.pages.find({status: "In Progress", workerId: 0})

Worker0 已完成:

一旦工作人员抓取了页面,它就可以将记录标记为已完成,以防止未来多次抓取。

db.pages.updateOne({_id: ObjectId("5db0b1953cf0c979dd020fa2")}, { $set: {status: "Finished"}})

结论:

我很好奇您对这种方法的想法,并感谢任何反馈,无论好坏。开火!

经过思考

一种完全不同的方法是在最初使用随机分配插入记录时分配工作人员。但是,这对已经使用空分配创建的记录没有帮助。

【讨论】:

    【解决方案2】:

    不创建单独的调度程序进程来分配工作,也许是一个三阶段的方法。

    1. 查询未决文档,限制仅检索 _id 字段。如果您在 {status:1, workerId:1, _id:1} 上有一个索引,则可以涵盖性能
    2. 使用 $in 运算符进行更新以将状态设置为 In Progress 并分配工作人员 ID
    3. 查询进行中和工作人员 ID

    类似:

    var ids = db.pages.find({status:"pending", workerId: null},{_id:1}).limit(100).toArray().map(p=>p._id)
    
    db.pages.updateMany({_id:{$in:ids}},{$set:{status:"In Progress", worker: MyID}})
    
    var workcursor = db.pages.find({status:"In Progress", worker: MyID})  
    

    如果您有多个工作人员同时进入,则可能会发生一场竞赛,他们可能都试图获取相同的页面。您可以在transaction 中执行上述步骤来避免这种情况。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-01-23
      • 2020-06-03
      • 1970-01-01
      • 1970-01-01
      • 2019-05-01
      • 2010-11-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多