【问题标题】:Updating complex nested elasticsearch document using logstash and jdbc使用 logstash 和 jdbc 更新复杂的嵌套弹性搜索文档
【发布时间】:2016-04-25 01:30:59
【问题描述】:

假设 Oracle Schema 有以下表和列:

国家 国家ID; (首要的关键) 国家的名字; 部 部门ID; (首要的关键) 部门名称; 国家ID; (国家的外键:country_id) 员工 员工ID; (首要的关键) 员工姓名; 部门ID; (部门的外键:department_id)

我有我的 Elasticsearch 文档,其中根元素是 Country 并且它包含 该国家/地区的所有部门,这些部门又包含各个部门的所有员工。

所以文档结构是这样的:

{ “映射”:{ “国家”: { “特性”: { "country_id": { "type": "string"}, “国家名称”:{“类型”:“字符串”}, “部”: { “类型”:“嵌套”, “特性”: { “部门ID”:{“类型”:“字符串”}, “部门名称”:{“类型”:“字符串”}, “员工”: { “类型”:“嵌套”, “特性”: { “employee_id”:{“类型”:“字符串”}, “员工姓名”:{“类型”:“字符串”} } } } } } } } }

我希望能够在每个表上运行单独的输入 jdbc 查询,它们应该创建/更新/删除 每当添加/更新/删除基表中的数据时,elasticsearch 文档中的数据。

这是一个示例问题,实际的表和数据结构更复杂。所以我不是在寻找解决方案 仅限于此。

有没有办法做到这一点?

谢谢。

【问题讨论】:

  • 我猜您可能已经解决了这个问题,但是,您能否使用 Oracle 视图将所需数据组合成文档结构格式(国家、部门、员工)并将其作为单个 JDBC 查询,这样您就可以将 elasticsearch 文档 ID 创建为最低唯一级别(在本例中为employee_id)并在那里管理更改?

标签: jdbc elasticsearch logstash


【解决方案1】:

对于第一级,直接使用aggregate filter。您需要在它们之间有一个共同的 id 才能引用。

filter {    

  aggregate {
    task_id => "%{id}"

    code => "     
      map['id'] = event.get('id')
      map['department'] ||= []
      map['department'] << event.to_hash.each do |key,value| { key => value } end    
    "
    push_previous_map_as_event => true
    timeout => 150000
    timeout_tags => ['aggregated']    
  } 

   if "aggregated" not in [tags] {
    drop {}
  }
}

重要:输出动作应该是更新

    output {
        elasticsearch {
            action => "update"
             ...
           }
        }

解决级别 2 的一种方法是查询已编入索引的文档并使用嵌套记录对其进行更新。 再次使用aggregate filter;文档应该有一个通用 ID,以便您可以查找并插入到正确的文档中。

filter {    
    #get the document from elastic based on id and store it in 'emp'
    elasticsearch {
            hosts => ["${ELASTICSEARCH_HOST}/${INDEX_NAME}/${INDEX_TYPE}"]
            query => "id:%{id}" 
            fields => { "employee" => "emp" }
         }



  aggregate {
    task_id => "%{id}"  
    code => "       
                map['id'] = event.get('id')
                map['employee'] = []
                employeeArr = []
                temp_emp = {}   

                event.to_hash.each do |key,value|                       
                    temp_emp[key] = value
                end     

                #push the objects into an array
                employeeArr.push(temp_emp)

                empArr = event.get('emp')                   

                for emp in empArr
                    emp['employee'] = employeeArr                       
                    map['employee'].push(emp)
                end
    "
    push_previous_map_as_event => true
    timeout => 150000
    timeout_tags => ['aggregated']

  } 

   if "aggregated" not in [tags] {
    drop {}
  } 

}

output {

elasticsearch {
        action => "update"    #important
         ...
        }
 }  

另外,为了调试 ruby​​ 代码,在输出中使用下面的代码

output{
    stdout { codec => dots }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多