【问题标题】:MongoDb Aggregation - Splitting into time bucketsMongoDb 聚合 - 拆分为时间桶
【发布时间】:2015-10-20 04:51:15
【问题描述】:

是否可以使用 MongoDB 聚合框架生成时间序列输出,其中任何被认为属于每个存储桶的源文档都被添加到该存储桶中?

假设我的收藏看起来像这样:

/*light_1 on from 10AM to 1PM*/
{
    "_id" : "light_1",
    "on" : ISODate("2015-01-01T10:00:00Z"),
    "off" : ISODate("2015-01-01T13:00:00Z"),

},
/*light_2 on from 11AM to 7PM*/
{
    "_id" : "light_2",
    "on" : ISODate("2015-01-01T11:00:00Z"),
    "off" : ISODate("2015-01-01T19:00:00Z")
}

..我使用 6 小时的存储桶间隔来生成 2015-01-01 的报告。我希望我的结果看起来像:

    {
        "start"         : ISODate("2015-01-01T00:00:00Z"),
        "end"           : ISODate("2015-01-01T06:00:00Z"),
        "lights"        : []
    },
    {
        "start"         : ISODate("2015-01-01T06:00:00Z"),
        "end"           : ISODate("2015-01-01T12:00:00Z"),
        "lights_on"     : ["light_1", "light_2"]
    },
    {
        "start"         : ISODate("2015-01-01T12:00:00Z"),
        "end"           : ISODate("2015-01-01T18:00:00Z"),
        "lights_on"     : ["light_1", "light_2"]
    },
    {
        "start"         : ISODate("2015-01-01T18:00:00Z"),
        "end"           : ISODate("2015-01-02T00:00:00Z"),
        "lights_on"     : ["light_2"]
    }

如果灯的“开”值 = 桶“开始”,则灯在一定范围内被视为“开”

我知道我可以使用 $group 和聚合日期运算符按开始时间或结束时间进行分组,但在这种情况下,它是一对一的映射。在这里,如果一个源文档跨越多个桶,它可能会分成几个时间桶。

报告范围和间隔跨度直到运行时才知道。

【问题讨论】:

  • "如果灯的'on'值>=桶'start',并且它的'off'值
  • light_1 的 "off" 是 ISODate("2015-01-01T13:00:00Z") 不小于 "end" : ISODate("2015-01-01T12:00:00Z")
  • 抱歉——我已经修改了上面的描述来纠正这个问题。基本上,如果桶和灯的时间跨度之间有重叠,灯就会亮。

标签: mongodb mapreduce time-series mongodb-query aggregation-framework


【解决方案1】:

简介

您的目标需要考虑一下何时记录事件,因为您将事件结构化到给定的时间段聚合中。显而易见的一点是,您所代表的单个文档实际上可以代表最终汇总结果中要在“多个”时间段内报告的事件。

分析结果表明这是一个超出aggregation framework 范围的问题,因为要查找的时间段。某些事件需要在可以分组的范围之外“生成”,您应该能够看到。

为了做到这一点,你需要mapReduce。这具有通过 JavaScript 进行的“流控制”,因为它的处理语言能够从本质上确定开/关之间的时间是否跨越多个时期,因此发出它发生在多个时期中的数据。

作为旁注,“灯”可能不是最适合_id,因为它可能在一天内多次打开/关闭。所以开/关的“实例”可能更好。但是,我只是在此处按照您的示例进行转换,因此只需将映射器代码中对 _id 的引用替换为表示灯光标识符的任何实际字段即可。

但到代码上:

// start date and next date for query ( should be external to main code )
var oneHour = ( 1000 * 60 * 60 ),
    sixHours = ( oneHour * 6 ),
    oneDay = ( oneHour * 24 ),
    today = new Date("2015-01-01"),               // your input
    tomorrow = new Date( today.valueOf() + oneDay ),
    yesterday = new Date( today.valueOf() - sixHours ),
    nextday = new Date( tomorrow.valueOf() + sixHours);

// main logic
db.collection.mapReduce(
    // mapper to emit data
    function() {
        // Constants and round date to hour
        var oneHour = ( 1000 * 60 * 60 )
            sixHours = ( oneHour * 6 )
            startPeriod = new Date( this.on.valueOf() 
              - ( this.on.valueOf() % oneHour )),
            endPeriod = new Date( this.off.valueOf()
              - ( this.off.valueOf() % oneHour ));

        // Hour to 6 hour period and convert to UTC timestamp
        startPeriod = startPeriod.setUTCHours( 
            Math.floor( startPeriod.getUTCHours() / 6) * 6 );
        endPeriod = endPeriod.setUTCHours( 
            Math.floor( endPeriod.getUTCHours() / 6) * 6 );

        // Init empty reults for each period only on first document processed
        if ( counter == 0 ) {
            for ( var x = startDay.valueOf(); x < endDay.valueOf(); x+= sixHours ) {
                emit(
                    { start: new Date(x), end: new Date(x + sixHours) },
                    { lights_on: [] }
                );
            }
        }

        // Emit for every period until turned off only within the day
        for ( var x = startPeriod; x <= endPeriod; x+= sixHours ) {
           if ( ( x >= startDay ) && ( x < endDay ) ) {
               emit(
                   { start: new Date(x), end: new Date(x + sixHours)  },
                   { lights_on: [this._id] }
               );
           }
        }
        counter++;
    },

    // reducer to keep all lights in one array per period
    function(key,values) {
        var result = { lights_on: [] };
        values.forEach(function(value) {
            value.lights_on.forEach(function(light){
                if ( result.lights_on.indexOf(light) == -1 )
                    result.lights_on.push(light);
            });
        });
        result.lights_on.sort();
        return result;
    },

    // options and query
    { 
        "out": { "inline": 1 },
        "query": {
            "on": { "$gte": yesterday, "$lt": tomorrow }, 
            "$or": [
                { "off": { "$gte:"  today, "$lt": nextday } },
                { "off": null },
                { "off": { "$exists": false } }
            ]
        },
        "scope": { 
            "startDay": today,
            "endDay": tomorrow,
            "counter": 0
        }
    }
)

映射和归约

本质上,“映射器”函数查看当前记录,将每个开/关时间四舍五入到小时,然后计算出事件发生在哪个六小时期间的开始时间。

使用这些新的日期值,将启动一个循环以获取开始的“开启”时间,并在该期间在单个元素数组中发出当前“灯”打开的事件,如下所述。每个循环将开始时间增加六个小时,直到达到“熄灯”结束时间。

这些出现在 reducer 函数中,它需要与它返回的相同的预期输入,因此灯阵列在值对象内的周期内打开。它在与这些值对象列表相同的键下处理发出的数据。

首先迭代要归约的值列表,然后查看可能来自之前的归约通道的内部灯光数组,并将其中的每一个处理成一个独特的灯光结果数组。只需在结果数组中查找当前光照值并将其推送到不存在的那个数组即可。

注意“前一次传递”,好像您不熟悉 mapReduce 的工作原理,那么您应该了解 reducer 函数本身发出的结果可能无法通过处理“所有”可能的值来实现一次通过“关键”。它可以并且通常只处理键的发出数据的“子集”,因此将采用与从映射器发出数据相同的方式将“缩减”结果作为输入。

这就是为什么 mapper 和 reducer 都需要输出具有相同结构的数据的原因,因为 reducer 本身也可以从之前已缩减的数据中获取输入。这就是 mapReduce 处理发出大量相同键值的大型数据集的方式。它通常以“块”的形式处理,而不是一次全部处理。

结束减少归结为时段内打开的灯的列表,每个时段的开始和结束都作为发出的键。像这样:

    {
        "_id": {
            "start": ISODate("2015-01-01T06:00:00Z"),
            "end": ISODate("2015-01-01T12:00:00Z")
        },
        {
            "result": {
                "lights_on": [ "light_1", "light_2" ]
            }
        }
    },

“_id”、“result”结构只是所有 mapReduce 输出如何输出的属性,但所需的值都在那里。

查询

现在这里还有一个关于查询选择的注释,需要考虑到灯可能已经在当天开始之前的某个日期通过其集合条目“打开”。同样的道理,它也可以在报告当前日期之后“关闭”,并且实际上可能具有null 值或文档中没有“关闭”键,具体取决于您的数据存储方式以及实际观察的日期。

该逻辑从要报告的当天开始创建一些所需的计算,并考虑该日期之前和之后的六小时期间,并列出查询条件:

        {
            "on": { "$gte": yesterday, "$lt": tomorrow }, 
            "$or": [
                { "off": { "$gte:"  today, "$lt": nextday } },
                { "off": null },
                { "off": { "$exists": false } }
            ]
        }

那里的基本选择器使用$gte$lt的范围运算符来分别在他们正在测试的值的字段上查找大于或等于和小于的值,以便查找数据在合适的范围内。

$or 条件内,考虑了“off”值的各种可能性。要么它属于范围标准,要么具有null 值,或者可能通过$exists 运算符在文档中根本没有键。这取决于$or 内的这些条件的要求,在尚未关闭灯的情况下,您实际上如何表示“关闭”,但这些将是合理的假设。

与所有 MongoDB 查询一样,除非另有说明,否则所有条件都是隐含的“AND”条件。

这仍然有些缺陷,具体取决于灯可能会打开多长时间。但是这些变量都是有意在外部列出的,以便根据您的需求进行调整,并考虑到在报告日期之前或之后获取的预期持续时间。

创建空时间序列

这里的另一个注意事项是,数据本身可能没有任何事件显示在给定时间段内点亮。出于这个原因,在 mapper 函数中嵌入了一个简单的方法,可以查看我们是否处于结果的第一次迭代中。

仅在第一次时,会发出一组可能的周期键,其中包括一个空数组,用于在每个周期中打开的灯。这使得报告还可以显示那些根本没有灯亮的时段,因为这被插入到发送到减速器和输出的数据中。

您可能会对此方法有所不同,因为它仍然依赖于某些满足查询条件的数据才能输出任何内容。因此,为了迎合没有数据记录或不符合标准的真正“空白日”,最好创建一个所有键的外部哈希表,所有键都显示灯的空结果。然后将 mapReduce 操作的结果“合并”到那些预先存在的键中以生成报告。

总结

这里有许多关于日期的计算,并且不知道实际的最终语言实现,我只是单独声明任何在实际 mapReduce 操作外部起作用的东西。所以任何看起来像重复的东西都是为了这个意图而做的,使逻辑语言的那部分独立。大多数编程语言都支持根据使用的方法操作日期的功能。

所有语言特定的输入都作为选项块传入,此处显示为 mapReduce 方法的最后一个参数。值得注意的是,查询的参数化值都是从要报告的日期计算出来的。然后是“范围”,它是一种传递值的方式,可以被 mapReduce 操作中的函数使用。

考虑到这些因素,mapper 和 reducer 的 JavaScript 代码保持不变,因为这是方法所期望的输入。过程中的任何变量都由范围和查询结果提供,以便在不更改代码的情况下获得结果。

因此,主要是因为“灯亮”的持续时间可以跨越要报告的不同时期,这成为聚合框架不适合做的事情。它无法执行获得结果所需的“循环”和“数据发送”,因此我们为什么要使用 mapReduce 来代替此任务。

也就是说,很好的问题。我不知道您是否已经考虑过如何在此处获得结果的概念,但至少现在有一个指南可供处理类似问题的人使用。

【讨论】:

  • 感谢您提供信息丰富的回复!您当然是正确的,灯光名称不是灯光测量指标的好 ID。我正在处理的实际问题具有更复杂的数据结构,我是手动简化的。
  • @DavidBlack 不是问题。写到一半时我就睡着了,直到提交时才注意到另一个答案。正如我在那里评论的那样,给出的过程大部分是正确的,但是在编写最终代码时应该注意此清单中的显着差异。主要是在选择和减速器操作中,另一个答案给出的错误,如果你这样编码,你会失败。
  • 我实际上有 1 亿条这样的记录要处理 - 还有其他分组阶段和过滤器要处理,所以我担心性能。聚合框架在计算非时间序列指标方面做得很好(例如,按总时间排序的某种类型的前 N ​​个灯等)
  • @DavidBlack 但由于上述原因,它不能解决这个问题。您的数据显示了一个“亮灯”,可能跨越您要报告的多个“桶”。聚合框架很棒,但它不能做到这一点。正如我所说,通读一遍,至少去掉与你之前接受的答案不同的点,因为你不希望它失败。
  • 非常好的 mapReduce 框架实例。我重新使用它来计算 CDR 数据库中每秒的同时呼叫数。有一点是我必须将.valueOf() 添加到第二个发射周期中的所有日期时间表达式中,例如for ( var x = startPeriod.valueOf(); x &lt;= endPeriod.valueOf(); x+= sixHours )if ( ( x &gt;= startDay.valueOf() ) &amp;&amp; ( x &lt; endDay.valueOf() ) ),以使其在Mongo 3.0.12 上正常工作
【解决方案2】:

我最初误解了你的问题。假设我了解您现在需要什么,这看起来更像是 map-reduce 的工作。我不确定你是如何确定范围或区间跨度的,所以我会制作这些常量,你可以正确修改那段代码。你可以这样做:

var mapReduceObj = {};

mapReduceObj.map = function() {
    var start = new Date("2015-01-01T00:00:00Z").getTime(),
    end = new Date("2015-01-02T00:00:00Z").getTime(),
    interval = 21600000;                     //6 hours in milliseconds

    var time = start;
    while(time < end) {
        var endtime = time + interval;
        if(this.on < endtime && this.off >= time) {
            emit({start : new Date(time), end : new Date(endtime)}, [this._id]);
            break;
        }

        time = endtime;
    }
};

mapReduceObj.reduce = function(times, light_ids) {
    var lightsArr = {lights : []};

    for(var i = 0; i < light_ids.length; i++) {
        lightsArr.lights.push(light_ids[i]);
    }

    return lightsArr;
};

结果将具有以下形式:

results :    {
    _id     :    {
        start   :   ISODate("2015-01-01T06:00:00Z"),
        end     :   ISODate("2015-01-01T12:00:00Z")
    },
    value   :    {
        lights  :    [
            "light_6",
            "light_7"
        ]
    },
    ...
}

~~~原答案~~~

这应该为您提供所需的确切格式。

db.lights.aggregate([
    { "$match": {
        "$and": [ 
            { on  : { $lt : ISODate("2015-01-01T06:00:00Z") } },
            { off : { $gte: ISODate("2015-01-01T12:00:00Z") } }
        ]
    }},
    { "$group": {
        _id         :   null,
        "lights_on" : {$push : "$_id"}
    }},
    { "$project": {
        _id     :    false,
        start   :    { $add : ISODate("2015-01-01T06:00:00Z") },
        end     :    { $add : ISODate("2015-01-01T12:00:00Z") },
        lights_on:   true
    }}
]);

首先,$match 条件查找所有满足您的时间限制的文档。然后$group 将_id 字段(在本例中为light_n,其中n 是一个整数)推送到lights_on 字段中。可以使用$addToSet$push,因为_id 字段是唯一的,但是如果您使用的字段可能有重复项,则需要确定数组中的重复项是否可以接受。最后,使用$project 获得您想要的确切格式。

【讨论】:

  • 感谢 c1moor 和 Aswin Jose Roy。这两种方法都行得通,但我希望以某种方式在聚合中包含间隔分割而不是循环。我可能有数千个间隔要计算,并且觉得在聚合中执行它可能会更好?我将尝试使用循环方法,看看效果如何。
  • 我不确定你的意思。您是否给出了间隔列表,并且您需要根据此列表对灯光进行分组?您传入的间隔列表的格式是什么(例如,它是具有字段startend 的对象数组吗?
  • @DavidBlack 如果更新的答案对您有帮助,请告诉我。
  • 是的,我试过了,效果很好——非常感谢!我一直在努力寻找 MapReduce 优于 Aggregation 的用途,这展示了一个很好的例子,说明了为什么你可以使用它。
  • @DavidBlack 其实对你们俩来说。这是“接近”但仍然不是 100% 正确。循环是主要的,但缺少的是日期值的正确“四舍五入”,以及对可以在报告期之前和之后打开/关闭的灯的日期选择的考虑。此外,reduce 函数有点偏离,并且在任何超过一小组输入的情况下都会失败。但是我仍然认为这值得付出努力,因此得到了我的有用投票。
【解决方案3】:

一种方法是使用 $project 的 $cond 运算符,并将每个“start”和“end”与原始集合中的“on”和“off”字段进行比较。使用您的 MongoDB 客户端循环遍历每个存储桶并执行以下操作:

db.lights.aggregate([
    { "$project": { 
       "present": { "$cond": [
           { "$and": [ 
               { "$lte": [ "$on", ISODate("2015-01-01T06:00:00Z") ] },
               { "$gte": [ "$off", ISODate("2015-01-01T12:00:00Z") ] }
           ]},
           1, 
       0
       ]}
   }}
]);

结果应该是这样的:

{ "_id" : "light_1", "present" : 0 }
{ "_id" : "light_2", "present" : 0 }
{ "_id" : "light_3", "present" : 1 }

对于所有带有{"present":1} 的文档,将灯光集合的"_id" 添加到您的客户端的"lights_on" 字段中。希望这会有所帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-02-08
    • 1970-01-01
    • 2015-10-11
    • 2023-03-07
    • 2019-02-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多