【问题标题】:ElasticSearch mapping the result of collapse / do operations on a grouped documentsElasticSearch 映射折叠/对分组文档执行操作的结果
【发布时间】:2020-06-24 07:26:45
【问题描述】:

有一个对话列表,每个对话都有一个消息列表。每条消息都有不同的字段和action 字段。我们需要考虑在对话的第一条消息中使用了动作A,在几条消息之后使用了动作A.1,过了一会儿A.1.1等等(有一个聊天机器人意图列表)。

对对话的消息操作进行分组将类似于:A > A > A > A.1 > A > A.1 > A.1.1 ...

问题:

我需要使用 ElasticSearch 创建一个报告,该报告将返回每个对话的actions group;接下来,我需要对类似的actions groups 进行分组添加计数;最后将导致Map<actionsGroup, count>'A > A.1 > A > A.1 > A.1.1', 3

构造actions group我需要消除每组重复;我需要A > A.1 > A > A.1 > A.1.1 而不是A > A > A > A.1 > A > A.1 > A.1.1

我开始做的步骤

{
   "collapse":{
      "field":"context.conversationId",
      "inner_hits":{
         "name":"logs",
         "size": 10000,
         "sort":[
            {
               "@timestamp":"asc"
            }
         ]
      }
   },
   "aggs":{
   },
}

接下来我需要什么:

  1. 我需要将折叠结果映射到单个结果中,例如A > A.1 > A > A.1 > A.1.1。我已经看到在这种情况下或aggr 可以在结果上使用scripts,并且可以创建我需要的操作列表,但是aggr 正在对所有消息进行操作,不仅在我崩溃的分组消息上。是否可以在折叠或类似解决方案中使用aggr
  2. 我需要对所有折叠的结果值 (A > A.1 > A > A.1 > A.1.1) 进行分组,添加计数并生成 Map<actionsGroup, count>

或者:

  1. 使用aggrconversationId 字段对对话消息进行分组(我不知道该怎么做)
  2. 使用脚本迭代所有值并为每个对话创建actions group。 (不确定这是否可能)
  3. 对所有值使用另一个aggr 并对重复项进行分组,返回Map<actionsGroup, count>

更新 2: 我设法获得了部分结果,但仍然存在一个问题。请检查here 我还需要修复什么。

更新 1:添加一些额外的细节

映射:

"mappings":{
  "properties":{
     "@timestamp":{
        "type":"date",
        "format": "epoch_millis"
     }
     "context":{
        "properties":{
           "action":{
              "type":"keyword"
           },
           "conversationId":{
              "type":"keyword"
           }
        }
     }
  }
}

对话文件示例:

Conversation 1.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id1",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id1",
    }
}

Conversation 2.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "A",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "A.1",
        "conversationId": "conv_id2",
    }
},
{
    "@timestamp": 1579632745002,
    "context": {
        "action": "A.1.1",
        "conversationId": "conv_id2",
    }
}

Conversation 3.
{
    "@timestamp": 1579632745000,
    "context": {
        "action": "B",
        "conversationId": "conv_id3",
    }
},
{
    "@timestamp": 1579632745001,
    "context": {
        "action": "B.1",
        "conversationId": "conv_id3",
    }
}

预期结果:

{
    "A -> A.1 -> A.1.1": 2,
    "B -> B.1": 1
}
Something similar, having this or any other format.

因为我是 elasticsearch 新手,所以我非常欢迎每个提示。

【问题讨论】:

  • 您可以使用术语聚合对键进行分组。如果可以添加映射、示例文档和预期结果,将更容易理解问题
  • @jaspreetchahal 我添加了一些额外的细节。
  • 您是否需要 convs1->2、convs2->1 的会话 ID 数?行动组在这里的作用是什么?
  • 没有。我需要会话计数actions group。就像每个对话都有一个动作列表A -> A.1 -> A.1.1,这是actions group;我需要知道actions group 的数量。
  • 我建议你放弃聚合并自己编写一个完整的脚本。 elastic.co/guide/en/elasticsearch/reference/master/…

标签: elasticsearch collapse elasticsearch-aggregation elasticsearch-query


【解决方案1】:

我使用弹性的scripted_metric 解决了它。此外,index 已从初始状态更改。

脚本:

{
   "size": 0,
   "aggs": {
        "intentPathsCountAgg": {
            "scripted_metric": {
                "init_script": "state.messagesList = new ArrayList();",
                "map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);",  
                "combine_script": "return state",
                "reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) "
            }
        }
    }
}

格式化脚本(为了更好的可读性 - 使用 .ts):

scripted_metric: {
  init_script: 'state.messagesList = new ArrayList();',
  map_script: `
    long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis;
    Map currentMessage = [
      'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'],
      'time': currentMessageTime,
      'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value
    ];
    state.messagesList.add(currentMessage);`,
  combine_script: 'return state',
  reduce_script: `
    List messages = new ArrayList();
    Map conversationsMap = new HashMap();
    Map intentsMap = new HashMap();
    boolean[] ifElseWorkaround = new boolean[1];

    for (state in states) {
      messages.addAll(state.messagesList);
    }

    messages.stream().forEach(message -> {
      Map existingMessage = conversationsMap.get(message.conversationId);
      if(existingMessage == null || message.time > existingMessage.time) {
        conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]);
      } else {
        ifElseWorkaround[0] = true;
      }
    });

    conversationsMap.entrySet().forEach(conversation -> {
      if (intentsMap.containsKey(conversation.getValue().intentsPath)) {
        long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1;
        intentsMap.put(conversation.getValue().intentsPath, intentsCount);
      } else {
        intentsMap.put(conversation.getValue().intentsPath, 1L);
      }
    });

    return intentsMap.entrySet().stream().map(intentPath -> [
      'path': intentPath.getKey().toString(),
      'count': intentPath.getValue()
    ]).collect(Collectors.toSet())`

答案:

{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 11,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    },
    "aggregations": {
        "intentPathsCountAgg": {
            "value": [
                {
                    "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2
                },
                {
                    "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3  -> smallTalk.greet4": 1
                },
                {
                    "smallTalk.greet -> smallTalk.greet2": 1
                }
            ]
        }
    }
}

【讨论】:

    【解决方案2】:

    使用Terms aggregation 中的脚本,我们可以在“context.action”的第一个字符上创建存储桶。使用相似术语子聚合 我们可以得到父桶ex A-> A.1->A.1.1 ...

    下的所有“context.action”

    查询:

    {
      "size": 0,
      "aggs": {
        "conversations": {
          "terms": {
            "script": {
              "source": "def term=doc['context.action'].value; return term.substring(0,1);" 
    --->  returns first character ex A,B,C etc
            },
            "size": 10
          },
          "aggs": {
            "sub_conversations": {
              "terms": {
                "script": {
                  "source": "if(doc['context.action'].value.length()>1) return doc['context.action'];"--> All context.action under [A], length check to ignore [A]
                },
                "size": 10
              }
            },
            "count": {
              "cardinality": {
                "script": {
                  "source": "if(doc['context.action'].value.length()>1) return doc['context.action'];"--> count of all context.action under A
                }
              }
            }
          }
        }
      }
    }
    

    由于在弹性搜索中无法加入不同的文档。您必须通过迭代聚合存储桶在客户端获取组合键。

    结果:

      "aggregations" : {
        "conversations" : {
          "doc_count_error_upper_bound" : 0,
          "sum_other_doc_count" : 0,
          "buckets" : [
            {
              "key" : "A",
              "doc_count" : 6,
              "sub_conversations" : {
                "doc_count_error_upper_bound" : 0,
                "sum_other_doc_count" : 0,
                "buckets" : [
                  {
                    "key" : "A.1",
                    "doc_count" : 2
                  },
                  {
                    "key" : "A.1.1",
                    "doc_count" : 2
                  }
                ]
              },
              "count" : {
                "value" : 2
              }
            },
            {
              "key" : "B",
              "doc_count" : 2,
              "sub_conversations" : {
                "doc_count_error_upper_bound" : 0,
                "sum_other_doc_count" : 0,
                "buckets" : [
                  {
                    "key" : "B.1",
                    "doc_count" : 1
                  }
                ]
              },
              "count" : {
                "value" : 1
              }
            }
          ]
        }
      }
    

    【讨论】:

    • 这不是我真正需要的。由于性能原因,我需要从 elasticsearch 中获得几乎完整的结果。
    • 我设法为我的问题获得了部分结果,但仍然存在一个问题。如果您有任何想法,请查看此帖子。 stackoverflow.com/questions/60662222
    猜你喜欢
    • 1970-01-01
    • 2018-12-08
    • 1970-01-01
    • 2016-05-14
    • 2019-12-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多