这是一个很难理解的问题,但这里的“聚合框架”存在一个主要问题,主要是您的“活动”根据其当前状态存在于几个不同的时间间隔中。
这意味着它始终存在一个“开始”和一个“完成”区间,以及可能的“几个”区间,其中任务可以被视为“正在执行”。
聚合框架不能真正一次性做到这一点。但是你可以用 mapReduce 做到这一点:
db.test.mapReduce(
function() {
// Work out time values
var finished = this.timestamp.valueOf() + this.duration,
finishedInterval = finished -
( finished % ( 1000 * 60 * 10 ) ),
interval = this.timestamp.valueOf() -
( this.timestamp.valueOf() % ( 1000 * 60 * 10 ) );
// Emit initialized
emit(
{
"year": new Date(interval).getUTCFullYear(),
"month": new Date(interval).getUTCMonth()+1,
"day": new Date(interval).getUTCDate(),
"hour": new Date(interval).getUTCHours(),
"minute": new Date(interval).getUTCMinutes()
},
{
"Initialized": 1,
"InExecution": 0,
"Finshed": 0
}
);
// Emit finished
emit(
{
"year": new Date(finishedInterval).getUTCFullYear(),
"month": new Date(finishedInterval).getUTCMonth()+1,
"day": new Date(finishedInterval).getUTCDate(),
"hour": new Date(finishedInterval).getUTCHours(),
"minute": new Date(finsihedInterval).getUTCMinutes()
},
{
"Initialized": 0,
"InExecution": 0,
"Finshed": 1
}
);
// Emit In execution for every 10 minute interval until finished
if ( ( interval + ( 1000 * 60 * 10 ) ) < finishedInterval ) {
for ( var x = interval; x<finishedInterval; x+= ( 1000 * 60 * 10 ) ) {
emit(
{
"year": new Date(x).getUTCFullYear(),
"month": new Date(x).getUTCMonth()+1,
"day": new Date(x).getUTCDate(),
"hour": new Date(x).getUTCHours(),
"minute": new Date(x).getUTCMinutes()
},
{
"Initialized": 0,
"InExecution": 1,
"Finshed": 0
}
);
}
}
},
function(key,values) {
var result = { "Initialized": 0, "InExecution": 0, "Finshed": 0 };
values.forEach(function(value) {
Object.keys(value).forEach(function(key) {
result[key] += value[key];
});
});
return result;
},
{
"out": { "inline": 1 },
"query": { "timestamp": { "$gte": new Date("2015-04-27T12:00:00Z") } }
}
)
如您所见,大部分工作都是在映射器中完成的。这基本上可以计算出任务“开始”和“结束”的时间间隔,并为此发出适当的数据。
当然,通过从任务的“开始”间隔开始工作,每 10 分钟间隔发出一个“执行中”计数,而该值小于任务的“结束”间隔。
reducer 只是简单地获取每个间隔的所有发出的计数并将它们相加。所以这是一个非常简单的操作。
map 和 reduce 逻辑是合理的,但查询选择逻辑存在问题,即“正在完成”或“正在执行”的作业可能会在第一次查询时间之前启动。
为了做到这一点,您需要修复该查询选择以考虑这一点,并且由于您不存储“完成”时间,因此您需要计算它,这意味着使用 $where 在查询中进行 JavaScript 评估:
{
"out": { "inline": 1 },
"query": {
"$where": function() {
return (this.timestamp >= new Date("2015-04-27T12:00:00Z") ||
new Date(this.timestamp.valueOf() + this.duration) >=
new Date("2015-04-27T12:00:00Z"))
}
}
}
这会选取在查询开始时间之前仍在运行或在当时完成的项目。
这不是很好,因为它会扫描集合,因此最好将“finshed”作为值包含在数据中以使查询选择更容易:
{
"out": { "inline": 1 },
"query": {
"$or": [
{ "timestamp": { "$gte": new Date("2015-04-27T12:00:00Z") } },
{ "finished": { "$gte": new Date("2015-04-27T12:00:00Z") } }
]
}
}
这可以使用“索引”并且速度更快。
最后,这里仍然会在“时间戳”过滤器值“之前”发出值,因为任何一种形式的“完成”都意味着在该时间之前开始的任务。出于同样的原因,最好在查询条件和逻辑上设置“结束”时间。
为此再次更改选项块以包含要在执行逻辑中使用的“范围”变量,并添加到“查询”条件:
{
"out": { "inline": 1 },
"query": {
"$or": [
{
"timestamp": {
"$gte": new Date("2015-04-27T12:00:00Z"),
"$lt": new Date("2015-04-28T12:00:00Z")
}
},
{
"finished": {
"$gte": new Date("2015-04-27T12:00:00Z"),
"$lt": new Date("2015-04-28T12:00:00Z")
}
}
]
},
"scope": {
"start": new Date("2015-04-27T12:00:00Z"),
"finsh": new Date("2015-04-28T12:00:00Z")
}
}
然后在每个发射周围添加条件,首先是“interval”大于“start”的started:
// Emit initialized
if ( interval >= start.valueOf() ) {
emit(
并且在“finishedInterval”小于“finish”的地方完成:
// Emit finished
if ( finishedInterval <= finish.valueOf() ) {
emit(
然后将循环限制在“执行中”:
// Emit In execution for every 10 minute interval until finished
if ( ( interval + ( 1000 * 60 * 10 ) ) < finishedInterval ) {
for ( var x = interval; (( x<finishedInterval ) && ( x<finish.valueOf() )); x+= ( 1000 * 60 * 10 ) ) {
if ( x > start.valueOf() ) {
emit(
这为您提供了一个清晰的起点和终点,同时将所有可能的统计信息列在结果中。