【问题标题】:Aggregate different states for timestamp periods聚合时间戳期间的不同状态
【发布时间】:2015-07-02 11:45:39
【问题描述】:

我正在使用 mongoDB,需要查询已初始化、正在执行并在一段时间内完成的搜索 nº 操作(每小时或每月...)

我的 json 文档具有以下结构:

{
  "_id" : ObjectId("55263d62c63265b9bb138551"),
  "timestamp" : ISODate("2015-02-12T15:27:48.546Z"),       
  "duration" : 199821
}

timestamp 字段是 procces init start,duration 是执行时间,以毫秒为单位。如果我添加时间戳 + 持续时间 = 完成时间戳

我可以使用此查询对一段时间内的操作数(10 分钟)进行分组:

db.test.aggregate([
  { "$match" :{ 
    "timestamp":{ "$gte": ISODate("2015-0427T12:00:00.0Z") }
  }},
  { "$group" :{
    "_id": { 
      "dayOfMonth":{ "$dayOfMonth": "$timestamp" },
      "month":{ "$month": "$timestamp" }, 
      "hour": { "$hour":"$timestamp" },
      "time": {
        "$subtract": [
          { "$minute":"$timestamp" },
          { "$mod": [{ "$minute": "$timestamp" }, 10] }
        ]
      }
    },
    "count":{ "$sum":1 }
  }},
  { "$sort": { "_id.time": 1 } }
])

但我还需要“执行中”和“已完成”的数量。

我尝试使用 mapreduce 和其他聚合查询,但无法获得与以下类似的结果:

{
 _id: {
  "month" : 03,
  "minute" : 00,
  "Initialized" : 6,
  "InExecution" : 10,
  "Finished": 5
  }
_id: {
  "month" : 03,
  "minute" : 10,
  "Initialized" : 4,
  "InExecution" : 12,
  "Finished": 4
  }
_id: {
  "month" : 03,
  "minute" : 20,
  "Initialized" : 3,
  "InExecution" : 8,
  "Finished": 5
  }  
}

【问题讨论】:

    标签: mongodb mapreduce mongodb-query aggregation-framework


    【解决方案1】:

    这是一个很难理解的问题,但这里的“聚合框架”存在一个主要问题,主要是您的“活动”根据其当前状态存在于几个不同的时间间隔中。

    这意味着它始终存在一个“开始”和一个“完成”区间,以及可能的“几个”区间,其中任务可以被视为“正在执行”。

    聚合框架不能真正一次性做到这一点。但是你可以用 mapReduce 做到这一点:

    db.test.mapReduce(
      function() {
         // Work out time values
         var finished = this.timestamp.valueOf() + this.duration,
             finishedInterval = finished -
               ( finished % ( 1000 * 60 * 10 ) ),
             interval = this.timestamp.valueOf() -
               ( this.timestamp.valueOf() % ( 1000 * 60 * 10 ) );
    
         // Emit initialized
         emit(       
           {
             "year": new Date(interval).getUTCFullYear(),
             "month": new Date(interval).getUTCMonth()+1,
             "day": new Date(interval).getUTCDate(),
             "hour": new Date(interval).getUTCHours(),
             "minute": new Date(interval).getUTCMinutes()
           },
           {
               "Initialized": 1,
               "InExecution": 0,
               "Finshed": 0
           }
         );
    
         // Emit finished
         emit(       
           {
             "year": new Date(finishedInterval).getUTCFullYear(),
             "month": new Date(finishedInterval).getUTCMonth()+1,
             "day": new Date(finishedInterval).getUTCDate(),
             "hour": new Date(finishedInterval).getUTCHours(),
             "minute": new Date(finsihedInterval).getUTCMinutes()
           },
           {
               "Initialized": 0,
               "InExecution": 0,
               "Finshed": 1
           }
         );
    
         // Emit In execution for every 10 minute interval until finished
         if ( ( interval + ( 1000 * 60 * 10 ) ) < finishedInterval ) {
           for ( var x = interval; x<finishedInterval; x+= ( 1000 * 60 * 10 ) ) {
             emit(
               {
                 "year": new Date(x).getUTCFullYear(),
                 "month": new Date(x).getUTCMonth()+1,
                 "day": new Date(x).getUTCDate(),
                 "hour": new Date(x).getUTCHours(),
                 "minute": new Date(x).getUTCMinutes()
               },
               {
                 "Initialized": 0,
                 "InExecution": 1,
                 "Finshed": 0
               }
             );
           }
         }
      },
      function(key,values) {
        var result = { "Initialized": 0, "InExecution": 0, "Finshed": 0 };
    
        values.forEach(function(value) {
          Object.keys(value).forEach(function(key) {
              result[key] += value[key];          
          });         
        });
    
        return result;
      },
      { 
        "out": { "inline": 1 },
        "query": { "timestamp": { "$gte": new Date("2015-04-27T12:00:00Z") } }
      }
    )
    

    如您所见,大部分工作都是在映射器中完成的。这基本上可以计算出任务“开始”和“结束”的时间间隔,并为此发出适当的数据。

    当然,通过从任务的“开始”间隔开始工作,每 10 分钟间隔发出一个“执行中”计数,而该值小于任务的“结束”间隔。

    reducer 只是简单地获取每个间隔的所有发出的计数并将它们相加。所以这是一个非常简单的操作。


    map 和 reduce 逻辑是合理的,但查询选择逻辑存在问题,即“正在完成”或“正在执行”的作业可能会在第一次查询时间之前启动。

    为了做到这一点,您需要修复该查询选择以考虑这一点,并且由于您不存储“完成”时间,因此您需要计算它,这意味着使用 $where 在查询中进行 JavaScript 评估:

    {
      "out": { "inline": 1 },
      "query": {
        "$where": function() {
          return (this.timestamp >= new Date("2015-04-27T12:00:00Z") ||
            new Date(this.timestamp.valueOf() + this.duration) >=
              new Date("2015-04-27T12:00:00Z"))
        }
      }
    }
    

    这会选取在查询开始时间之前仍在运行或在当时完成的项目。

    这不是很好,因为它会扫描集合,因此最好将“finshed”作为值包含在数据中以使查询选择更容易:

    {
      "out": { "inline": 1 },
      "query": {
         "$or": [
             { "timestamp": { "$gte": new Date("2015-04-27T12:00:00Z") } },
             { "finished": { "$gte": new Date("2015-04-27T12:00:00Z") } }
         ]
      }
    }
    

    这可以使用“索引”并且速度更快。


    最后,这里仍然会在“时间戳”过滤器值“之前”发出值,因为任何一种形式的“完成”都意味着在该时间之前开始的任务。出于同样的原因,最好在查询条件和逻辑上设置“结束”时间。

    为此再次更改选项块以包含要在执行逻辑中使用的“范围”变量,并添加到“查询”条件:

    {
      "out": { "inline": 1 },
      "query": {
        "$or": [
          { 
            "timestamp": { 
              "$gte": new Date("2015-04-27T12:00:00Z"),
              "$lt": new Date("2015-04-28T12:00:00Z")
            }
          },
          { 
            "finished": { 
              "$gte": new Date("2015-04-27T12:00:00Z"),
              "$lt": new Date("2015-04-28T12:00:00Z")
            }
          }
        ]
      },
      "scope": {
          "start": new Date("2015-04-27T12:00:00Z"),
          "finsh": new Date("2015-04-28T12:00:00Z")
      }
    }
    

    然后在每个发射周围添加条件,首先是“interval”大于“start”的started:

         // Emit initialized
         if ( interval >= start.valueOf() ) {
           emit(       
    

    并且在“finishedInterval”小于“finish”的地方完成:

         // Emit finished
         if ( finishedInterval <= finish.valueOf() ) {
           emit(       
    

    然后将循环限制在“执行中”:

         // Emit In execution for every 10 minute interval until finished
         if ( ( interval + ( 1000 * 60 * 10 ) ) < finishedInterval ) {
         for ( var x = interval; (( x<finishedInterval ) && ( x<finish.valueOf() )); x+= ( 1000 * 60 * 10 ) ) {
           if ( x > start.valueOf() ) {
             emit(
    

    这为您提供了一个清晰的起点和终点,同时将所有可能的统计信息列在结果中。

    【讨论】:

      【解决方案2】:

      非常感谢布雷克斯,

      为了您的极大兴趣。我一直在研究您的解决方案并想出好主意。

      我找到了一个可能的聚合框架解决方案。

      db.getCollection('test').aggregate(
      {$match:{ 
                 "timestamp":{$exists: true, "$gte": ISODate("2015-03-27T12:00:00.0Z") },              
               } },
      { $project: {
          _id: 1,
          timestamp : 1,
          error: {$cond: [{$eq: ["$severidad", "ERROR"]}, 1, 0]},
      
          init: {$cond: [ {$and : [{$eq: [{"$subtract": [
                                          {"$minute":"$timestamp"},
                                          {"$mod": [{"$minute":"$timestamp"}, 10]}
                                      ]}, {"$subtract": [
                                          {"$minute":"$timestamp"},
                                          {"$mod": [{"$minute":"$timestamp"}, 10]}
                                      ]}]}, {$ne: ["$severidad", "ERROR"]}]}, 1, 0]},
      
          executing: {$cond: [ {$and : [{$gt: [{"$subtract": [
                                          {"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}},
                                          {"$mod": [{"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}}, 10]}
                                      ]}, {"$subtract": [
                                          {"$minute":"$timestamp"},
                                          {"$mod": [{"$minute":"$timestamp"}, 10]}
                                      ]}]}, {$ne: ["$severidad", "ERROR"]}]}, 1, 0]},
      
          finished: {$cond: [ {$and : [{$eq: [{"$subtract": [
                                          {"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}},
                                          {"$mod": [{"$minute":{ $add: [ "$timestamp", "$datos_aplicacion.duracion"]}}, 10]}
                                      ]}, {"$subtract": [
                                          {"$minute":"$timestamp"},
                                          {"$mod": [{"$minute":"$timestamp"}, 10]}
                                   ]}]}, {$ne: ["$severidad", "ERROR"]}]}, 1, 0]},                                    
          }}, 
      {$group :{_id: { 
      
              dayOfMonth:{"$dayOfMonth":"$timestamp"}, month:{"$month":"$timestamp"}, hour:{"$hour":"$timestamp"} ,
              time: {
                      "$subtract": [
                          {"$minute":"$timestamp"},
                          {"$mod": [{"$minute":"$timestamp"}, 10]}
                      ]
                  },            
          },
          NumError: {$sum:"$error"},
          NumInit:{$sum:"$init"},
          NumExecuting:{$sum:"$executing"},
          NumFinished:{$sum:"$finished"}        
      
          }},
      { $sort : { "_id": 1} });
      

      1M 记录需要 1.2 秒

      问候,

      【讨论】:

      • 如果您研究我给出的答案的内容,您应该意识到这些结果并不相同。 “已完成”将仅是在指定期间开始和完成的工作。对于“执行中”也是如此,因为它只报告该期间的作业正在执行,然后仅在该期间执行。如果任何一种类型“跨越边界”(这可能是几毫秒的问题),那么它就不会被拾取。这就是为什么选择 mapReduce 作为答案,我花时间解释这些观点。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-08
      • 1970-01-01
      • 2021-10-29
      • 2021-02-05
      • 1970-01-01
      相关资源
      最近更新 更多