【问题标题】:How to aggregate the result of aggregation in Elasticsearch?如何在 Elasticsearch 中聚合聚合的结果?
【发布时间】:2019-12-23 20:34:40
【问题描述】:

我想使用 Elasticsearch 聚合其他聚合的结果。我已经创建了我需要的第一个聚合:

es.search(index='stackoverflow', body = {
    "size":0,
    "query": {
        "bool": {
          "filter": {
              "match" : {"type": "Posts"}
          },
          "filter": {
              "match" : {"PostTypeId": "1"}
          }
        }
    },
    "aggs" : {
        "by_user": {
          "terms": {
            "field": "OwnerUserId"
          }
        }
    }
})

此查询获取所有类型为 post 的文档,即问题 (PostTypeId = 1)。然后,通过OwnerUserId进行聚合,统计每个用户的问题发帖数,得到如下结果:

{'took': 0,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 10000, 'relation': 'gte'},
  'max_score': None,
  'hits': []},
 'aggregations': {'by_user': {'doc_count_error_upper_bound': 0,
   'sum_other_doc_count': 31053,
   'buckets': [{'key': '2230', 'doc_count': 223},
    {'key': '', 'doc_count': 177},
    {'key': '38304', 'doc_count': 158},
    {'key': '5997', 'doc_count': 144},
    {'key': '4048', 'doc_count': 130},
    {'key': '25813', 'doc_count': 119},
    {'key': '27826', 'doc_count': 119},
    {'key': '2633', 'doc_count': 115},
    {'key': '19919', 'doc_count': 114},
    {'key': '13938', 'doc_count': 111}]}}}

现在我想对前一个结果进行另一个聚合:按 doc_count 聚合,我的意思是对相同数量的问题帖子进行分组和计数。对于之前的结果,我想要的结果是:

{'buckets': [{'key': '223', 'doc_count': 1},
    {'key': '177', 'doc_count': 1},
    {'key': '158', 'doc_count': 1},
    {'key': '144', 'doc_count': 1},
    {'key': '130', 'doc_count': 1},
    {'key': '119', 'doc_count': 2},
    {'key': '115', 'doc_count': 1},
    {'key': '114', 'doc_count': 1},
    {'key': '111', 'doc_count': 1}]}

【问题讨论】:

    标签: elasticsearch elasticsearch-aggregation


    【解决方案1】:

    我可以找到一种方法来聚合聚合结果(至少直接聚合)。正如我在 Elasticsearch 论坛中所读到的,没有考虑这个用例,因为它效率太低了。

    为了解决我的用例,我所做的是利用转换 API 将第一个聚合存储在时间索引中,然后对该索引执行第二个聚合。

    首先我创建一个转换来执行第一次聚合(按 OwnerUserId 分组并计算每个用户发布的问题数量):

    url = 'http://localhost:9200/_transform/transform_rq1'
    headers = {
       'Content-Type': 'application/json'
    }
    query = {
      "source": {
        "index": "posts",
        "query": {
            "bool": {
              "filter": {
                  "match" : {"PostTypeId": "1"}
              }
            }
        }
      },
      "dest": {
        "index": "rq1"
      },
      "pivot": {
        "group_by": {
          "OwnerUserId": {
            "terms": {
              "field": "OwnerUserId"
            }
          }
        },
        "aggregations": {
          "count": {
            "value_count": {
              "field": "OwnerUserId"
            }
          }
        }
      }
    }
    
    response = requests.put(url, headers=headers, data=json.dumps(query))
    

    然后,我启动转换以执行它:

    url = 'http://localhost:9200/_transform/transform_rq1/_start'
    headers = {
       'Content-Type': 'application/json'
    }
    
    response = requests.post(url, headers=headers).json()
    

    最后,我在创建的时间索引上执行第二次聚合(按每个用户的问题数量分组以获得多少用户发布多少问题):

    response = es.search(index='rq1', body = {
        "size":0,
        "query": {
                    "match_all": {}
                 },
        "aggs" : {
            "by_num": {
              "terms": {
                "field": "count",
                "order" : { "_key" : "asc" },
                "size": 30000
              }
            }
        }
    })
    
    print(response)
    

    如您所见,我用 Python 编写了这段代码。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-10-22
      • 2018-12-11
      • 2019-08-22
      • 1970-01-01
      • 2015-10-06
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多