【问题标题】:Moving averages with MongoDB's aggregation framework?使用 MongoDB 的聚合框架移动平均值?
【发布时间】:2014-08-06 01:42:41
【问题描述】:

如果您有 50 年的温度天气数据(每天)(例如),您将如何使用 3 个月的间隔计算该时间段的移动平均值?你可以用一个查询来做到这一点,还是必须有多个查询?

Example Data

01/01/2014 = 40 degrees
12/31/2013 = 38 degrees
12/30/2013 = 29 degrees
12/29/2013 = 31 degrees
12/28/2013 = 34 degrees
12/27/2013 = 36 degrees
12/26/2013 = 38 degrees
.....

【问题讨论】:

  • 你到底是什么意思?你想让某些值重叠吗?如果是这样,哪些?白天?或者只是一个滚动平均值。聚合框架无法真正将一个文档与另一个文档进行比较,所以这听起来更像是 mapReduce。
  • @neil-lunn 我想计算一个滚动平均值。所以对于 3 个月的间隔,我想用过去 3 个月的数据取一天的平均值,然后这样做在接下来的 50 年中,50 年的平均每一天。所以我认为某些值会与平均值重叠。您将如何使用 mapReduce 来做到这一点,而不是使用聚合框架来做到这一点。我认为您是对的,我必须比较单独的文件。谢谢!
  • @neil-lunn,看起来聚合框架无法做到这一点,你是对的......jira.mongodb.org/browse/SERVER-4437 .. 如果你知道如何使用 mongodb 的 mapreduce 来做到这一点,请告诉我跨度>
  • 另外我想创建这个移动或滚动平均数据数组 - 更多关于移动平均的信息在这里en.wikipedia.org/wiki/Moving_average

标签: mongodb aggregation-framework moving-average


【解决方案1】:

agg 框架现在内置了$map$reduce$range,因此数组处理更加直接。下面是计算一组数据的移动平均值的示例,您希望在其中按某些谓词进行过滤。基本设置是每个文档都包含可过滤的条件和一个值,例如

{sym: "A", d: ISODate("2018-01-01"), val: 10}
{sym: "A", d: ISODate("2018-01-02"), val: 30}

这里是:

// This controls the number of observations in the moving average:
days = 4;

c=db.foo.aggregate([

// Filter down to what you want.  This can be anything or nothing at all.
{$match: {"sym": "S1"}}

// Ensure dates are going earliest to latest:
,{$sort: {d:1}}

// Turn docs into a single doc with a big vector of observations, e.g.
//     {sym: "A", d: d1, val: 10}
//     {sym: "A", d: d2, val: 11}
//     {sym: "A", d: d3, val: 13}
// becomes
//     {_id: "A", prx: [ {v:10,d:d1}, {v:11,d:d2},  {v:13,d:d3} ] }
//
// This will set us up to take advantage of array processing functions!
,{$group: {_id: "$sym", prx: {$push: {v:"$val",d:"$date"}} }}

// Nice additional info.  Note use of dot notation on array to get
// just scalar date at elem 0, not the object {v:val,d:date}:
,{$addFields: {numDays: days, startDate: {$arrayElemAt: [ "$prx.d", 0 ]}} }

// The Juice!  Assume we have a variable "days" which is the desired number
// of days of moving average.
// The complex expression below does this in python pseudocode:
//
// for z in range(0, size of value vector - # of days in moving avg):
//    seg = vector[n:n+days]
//    values = seg.v
//    dates = seg.d
//    for v in seg:
//        tot += v
//    avg = tot/len(seg)
// 
// Note that it is possible to overrun the segment at the end of the "walk"
// along the vector, i.e. not enough date-values.  So we only run the
// vector to (len(vector) - (days-1).
// Also, for extra info, we also add the number of days *actually* used in the
// calculation AND the as-of date which is the tail date of the segment!
//
// Again we take advantage of dot notation to turn the vector of
// object {v:val, d:date} into two vectors of simple scalars [v1,v2,...]
// and [d1,d2,...] with $prx.v and $prx.d
//
,{$addFields: {"prx": {$map: {
    input: {$range:[0,{$subtract:[{$size:"$prx"}, (days-1)]}]} ,
    as: "z",
    in: {
       avg: {$avg: {$slice: [ "$prx.v", "$$z", days ] } },
       d: {$arrayElemAt: [ "$prx.d", {$add: ["$$z", (days-1)] } ]}
        }
        }}
    }}

            ]);

这可能会产生以下输出:

{
    "_id" : "S1",
    "prx" : [
        {
            "avg" : 11.738793632512115,
            "d" : ISODate("2018-09-05T16:10:30.259Z")
        },
        {
            "avg" : 12.420766702631376,
            "d" : ISODate("2018-09-06T16:10:30.259Z")
        },
        ...

    ],
    "numDays" : 4,
    "startDate" : ISODate("2018-09-02T16:10:30.259Z")
}

【讨论】:

  • 这个解决方案是否适用于大型数据集(比如 1000 万个文档)?
  • @JayeshSingh 取决于$group。如果您正在考虑创建一个包含 20m val:date 对的数组将超过 16m 文档的限制,那么您是对的。
【解决方案2】:

我倾向于在 MongoDB 中执行此操作的方式是在文档中为每一天的值维护过去 90 天的运行总和,例如

{"day": 1, "tempMax": 40, "tempMaxSum90": 2232}
{"day": 2, "tempMax": 38, "tempMaxSum90": 2230}
{"day": 3, "tempMax": 36, "tempMaxSum90": 2231}
{"day": 4, "tempMax": 37, "tempMaxSum90": 2233}

每当需要将一个新数据点添加到集合中时,您可以通过两个简单查询(一个加法和一个减法)有效地计算下一个总和,而不是读取和求和 90 个值,如下所示(伪代码):

tempMaxSum90(day) = tempMaxSum90(day-1) + tempMax(day) - tempMax(day-90)

每天的 90 天移动平均线就是 90 天的总和除以 90。

如果您还想提供不同时间尺度的移动平均线(例如 1 周、30 天、90 天、1 年),您可以简单地为每个文档维护一个总和数组,而不是一个总和,一个总和所需的每个时间尺度。

这种方法需要额外的存储空间和额外的处理来插入新数据,但适用于大多数时间序列图表场景,其中新数据的收集速度相对较慢并且需要快速检索。

【讨论】:

    【解决方案3】:

    接受的答案帮助了我,但我花了一段时间才理解它是如何工作的,所以我想我会解释我的方法来帮助别人。特别是在您的情况下,我认为我的回答会有所帮助

    理想情况下,这适用于较小的数据集

    首先将数据按天分组,然后将数组中的所有天附加到每一天:

    {
      "$sort": {
        "Date": -1
      }
    },
    {
      "$group": {
        "_id": {
          "Day": "$Date",
          "Temperature": "$Temperature"
        },
        "Previous Values": {
          "$push": {
            "Date": "$Date",
            "Temperature": "$Temperature"
          }
        }
      }
    

    这将为您留下如下所示的记录(将正确排序):

    {"_id.Day": "2017-02-01", 
    "Temperature": 40, 
    "Previous Values": [
        {"Day": "2017-03-01", "Temperature": 20},
        {"Day": "2017-02-11", "Temperature": 22},
        {"Day": "2017-01-18", "Temperature": 03},
        ...
        ]},
    

    现在每一天都附加了所有天,我们需要从 Previous Values 数组中删除比 this _id.Day 字段更新的项目,因为移动平均线是向后看的:

    {
      "$project": {
        "_id": 0,
        "Date": "$_id.Date",
        "Temperature": "$_id.Temperature",
        "Previous Values": 1
      }
    },
    {
      "$project": {
        "_id": 0,
        "Date": 1,
        "Temperature": 1,
        "Previous Values": {
          "$filter": {
            "input": "$Previous Values",
            "as": "pv",
            "cond": {
              "$lte": ["$$pv.Date", "$Date"]
            }
          }
        }
      }
    },
    

    Previous Values 数组中的每一项将仅包含小于或等于每条记录日期的日期:

    {"Day": "2017-02-01", 
    "Temperature": 40, 
    "Previous Values": [
        {"Day": "2017-01-31", "Temperature": 33},
        {"Day": "2017-01-30", "Temperature": 36},
        {"Day": "2017-01-29", "Temperature": 33},
        {"Day": "2017-01-28", "Temperature": 32},
        ...
        ]}
    

    现在我们可以选择我们的平均窗口大小,因为数据是按天计算的,对于一周我们将获取数组的前 7 条记录;每月,30;或 3 个月一次,90 天:

    {
      "$project": {
        "_id": 0,
        "Date": 1,
        "Temperature": 1,
        "Previous Values": {
          "$slice": ["$Previous Values", 0, 90]
        }
      }
    },
    

    为了平均以前的温度,我们展开以前的值数组,然后按日期字段分组。展开操作是这样的:

    {"Day": "2017-02-01", 
    "Temperature": 40, 
    "Previous Values": {
            "Day": "2017-01-31", 
            "Temperature": 33}
    },
    
    {"Day": "2017-02-01", 
    "Temperature": 40, 
    "Previous Values": {
            "Day": "2017-01-30", 
            "Temperature": 36}
    },
    
    {"Day": "2017-02-01", 
    "Temperature": 40, 
    "Previous Values": {
            "Day": "2017-01-29", 
            "Temperature": 33}
    },
    ...
    

    看到 Day 字段是相同的,但我们现在从 Previous Values 数组中获得了每个先前日期的文档。 现在我们可以按天分组,然后平均 Previous Values.Temperature 以获得移动平均值:

    {"$group": {
        "_id": {
          "Day": "$Date",
          "Temperature": "$Temperature"
        },
        "3 Month Moving Average": {
          "$avg": "$Previous Values.Temperature"
        }
      }
    }
    

    就是这样!我知道将每条记录连接到每条记录并不理想,但这适用于较小的数据集

    【讨论】:

      【解决方案4】:

      Mongo 5 开始,这是新的$setWindowFields 聚合运算符的完美用例:

      请注意,为简单起见,我认为滚动平均值有一个 3 天的窗口(今天和前 2 天):

      // { date: ISODate("2013-12-26"), temp: 38 }
      // { date: ISODate("2013-12-27"), temp: 36 }
      // { date: ISODate("2013-12-28"), temp: 34 }
      // { date: ISODate("2013-12-29"), temp: 31 }
      // { date: ISODate("2013-12-30"), temp: 29 }
      // { date: ISODate("2013-12-31"), temp: 38 }
      // { date: ISODate("2014-01-01"), temp: 40 }
      db.collection.aggregate([
        { $setWindowFields: {
          sortBy: { date: 1 },
          output: {
            movingAverage: {
              $avg: "$temp",
              window: { range: [-2, "current"], unit: "day" }
            }
          }
        }}
      ])
      // { date: ISODate("2013-12-26"), temp: 38, movingAverage: 38 }
      // { date: ISODate("2013-12-27"), temp: 36, movingAverage: 37 }
      // { date: ISODate("2013-12-28"), temp: 34, movingAverage: 36 }
      // { date: ISODate("2013-12-29"), temp: 31, movingAverage: 33.67 }
      // { date: ISODate("2013-12-30"), temp: 29, movingAverage: 31.33 }
      // { date: ISODate("2013-12-31"), temp: 38, movingAverage: 32.67 }
      // { date: ISODate("2014-01-01"), temp: 40, movingAverage: 35.67 }
      

      这个:

      • 按时间顺序排序文档:sortBy: { date: 1 }
      • 为每个文档创建一个文档范围(window):
        • 包括"current" 文档和"2"-"day" 窗口中的所有以前的文档
      • 在该窗口内,平均温度:$avg: "$temp"

      【讨论】:

        【解决方案5】:

        我想我可能对我自己的问题有一个答案。 Map Reduce 会做到这一点。首先使用 emit 将每个文档映射到它应该被平均的邻居,然后使用 reduce 来平均每个数组......并且新的平均值数组应该是移动平均线图加班,因为它的 id 将是新的日期间隔你在乎

        我想我需要更好地理解 map-reduce ...

        :)

        例如...如果我们想在内存中进行(稍后我们可以创建集合)

        GIST https://gist.github.com/mrgcohen/3f67c597a397132c46f7

        看起来对吗?

        【讨论】:

        • 好的,做了一些调整,但我很确定整体想法可以做到。你可能需要调整间隔到你想要的,但它应该可以工作。
        • problem .. 这将在一个巨大的数据集上运行多快,听起来 mongo 很慢,除非你开始分片......这里的最佳实践是什么?帮助
        【解决方案6】:

        我不相信聚合框架可以在当前版本 (2.6) 中为多个日期执行此操作,或者至少在没有一些严肃的体操的情况下无法执行此操作。原因是聚合管道一次处理一个文档并且只处理一个文档,因此有必要以某种方式为每天创建一个包含前 3 个月相关信息的文档。这将作为一个$group 阶段计算平均值,这意味着前一个阶段将生成大约 90 个每天记录的副本,其中包含一些可用于$group 的区别键。

        因此,我看不到在一个聚合中一次在多个日期执行此操作的方法。如果有人找到方法,我很乐意犯错并且必须编辑/删除这个答案,即使它非常复杂,不切实际。一个 PostgreSQL PARTITION 类型的函数可以在这里完成工作;也许有一天会添加这个功能。

        【讨论】:

        • 那么您必须查询其中的一部分并以某种语言(ruby、python、node)计算移动平均值,或者为每个间隔运行聚合查询......真的吗?最好的解决方案?不就是感觉不对吗?有没有更好的方法来使用我想不到的 map-reduce?
        • 我还没有考虑过 map-reduce。通常,我会尽量避免使用 map-reduce,因为它会显着降低性能,并且由于您在服务器端运行自定义代码,因此不那么安全。我会试着考虑一下,或者也许其他人会想出一个 m/r 解决方案。
        • 您绝对应该能够进行 M/R,但我目前没有时间解决。我会努力解决并更新我的答案。
        • 自 v3.4(2016 年 12 月)起不再是问题;请参阅上面的 $map/$range 示例。
        猜你喜欢
        • 2012-12-27
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-08-12
        • 2012-10-11
        相关资源
        最近更新 更多