【问题标题】:Transform ElasticSearch index from field explosions into nested documents via Logstash通过 Logstash 将 ElasticSearch 索引从字段爆炸转换为嵌套文档
【发布时间】:2018-02-06 04:09:11
【问题描述】:

所以我们有一个旧的弹性搜索索引,它屈服于字段爆炸。我们重新设计了索引的结构以使用嵌套文档解决此问题。但是,我们正在尝试弄清楚如何将旧索引数据迁移到新结构中。我们目前正在考虑使用 Logstash 插件,尤其是聚合插件来尝试实现这一目标。但是,我们可以找到的所有示例都显示了如何从数据库调用创建嵌套文档,而不是从字段爆炸索引创建。对于上下文,以下是旧索引的示例:

"assetID": 22074,
"metadata": {
  "50": {
    "analyzed": "Phase One",
    "full": "Phase One",
    "date": "0001-01-01T00:00:00"
  },
  "51": {
    "analyzed": "H 25",
    "full": "H 25",
    "date": "0001-01-01T00:00:00"
  },
  "58": {
    "analyzed": "50",
    "full": "50",
    "date": "0001-01-01T00:00:00"
  }
}

这是我们希望转换后的数据最终的样子:

"assetID": 22074,
"metadata": [{
    "metadataId": 50,
    "ngrams": "Phase One", //This was "analyzed"
    "alphanumeric": "Phase One", //This was "full"
    "date": "0001-01-01T00:00:00"
  }, {
    "metadataId": 51,
    "ngrams": "H 25", //This was "analyzed"
    "alphanumeric": "H 25", //This was "full"
    "date": "0001-01-01T00:00:00"
  }, {
    "metadataId": 58,
    "ngrams": "50", //This was "analyzed"
    "alphanumeric": "50", //This was "full"
    "date": "0001-01-01T00:00:00"
  }
}]

作为一个简单的示例,我们可以从聚合插件中得出以下结论:

input {
  elasticsearch {
    hosts => "my.old.host.name:9266"
    index => "my-old-index"
    query => '{"query": {"bool": {"must": [{"term": {"_id": "22074"}}]}}}'  
    size => 500
    scroll => "5m"
    docinfo => true
  }
}

filter {
   aggregate {
    task_id => "%{id}"

    code => "     
      map['assetID'] = event.get('assetID')
      map['metadata'] ||= []
      map['metadata'] << {
        metadataId => ? //somehow parse the Id out of the exploded field name "metadata.#.full",
        ngrams => event.get('metadata.#.analyzed'),
        alphanumeric => event.get('metadata.#.full'),
        date => event.get('metadata.#.date'),
      }
    "
    push_previous_map_as_event => true
    timeout => 150000
    timeout_tags => ['aggregated']    
  } 

   if "aggregated" not in [tags] {
    drop {}
  }

}

output {
  elasticsearch {
    hosts => "my.new.host:9266"
    index => "my-new-index"
    document_type => "%{[@metadata][_type]}"
    document_id => "%{[@metadata][_id]}"
    action => "update"
  }

  file {
    path => "C:\apps\logstash\logstash-5.6.6\testLog.log"
  }  
}

显然,上面的示例基本上只是伪代码,但这就是我们通过查看 Logstash 和 ElasticSearch 的文档,以及聚合过滤器插件以及通常在谷歌上搜索它们生命中的一英寸内的东西所能收集到的全部内容。

【问题讨论】:

    标签: elasticsearch migration mapping logstash nested-documents


    【解决方案1】:

    您可以玩弄事件对象,对其进行按摩,然后将其添加到新索引中。如下所示(logstash 代码未经测试,您可能会发现一些错误。请检查本节之后的工作 ruby​​ 代码):

     aggregate {
        task_id => "%{id}"
    
        code => "arr = Array.new()
    
    map["assetID"] = event.get("assetID")
    
    metadataObj = event.get("metadata")
    metadataObj.to_hash.each do |key,value| 
      transformedMetadata = {} 
      transformedMetadata["metadataId"] = key  
    
      value.to_hash.each do |k , v|
    
        if k == "analyzed" then
           transformedMetadata["ngrams"] = v
        elsif k == "full" then
           transformedMetadata["alphanumeric"] = v
        else
           transformedMetadata["date"] = v
        end
      end
      arr.push(transformedMetadata)
    end
      map['metadata'] ||= []
      map['metadata'] << arr
    
    "
    
      }
    }
    

    根据事件输入尝试使用上述方法,您将到达那里。这是一个工作示例,您可以在问题中提供输入,供您玩耍:https://repl.it/repls/HarshIntelligentEagle

    json_data = {"assetID": 22074,
    "metadata": {
      "50": {
        "analyzed": "Phase One",
        "full": "Phase One",
        "date": "0001-01-01T00:00:00"
      },
      "51": {
        "analyzed": "H 25",
        "full": "H 25",
        "date": "0001-01-01T00:00:00"
      },
      "58": {
        "analyzed": "50",
        "full": "50",
        "date": "0001-01-01T00:00:00"
      }
    }
    }
    
    arr = Array.new()
    transformedObj = {}
    transformedObj["assetID"] = json_data[:assetID]
    
    
    json_data[:metadata].to_hash.each do |key,value|  
      transformedMetadata = {}
      transformedMetadata["metadataId"] = key  
      
      value.to_hash.each do |k , v|
      
        if k == :analyzed then
           transformedMetadata["ngrams"] = v
        elsif k == :full then
           transformedMetadata["alphanumeric"] = v
        else
           transformedMetadata["date"] = v
        end
      end
      arr.push(transformedMetadata)
    end
    transformedObj["metadata"] = arr
    
    puts transformedObj

    【讨论】:

      【解决方案2】:

      最后,我们用 ruby​​ 代码在脚本中解决了:

      # Must use the input plugin for elasticsearch at version 4.0.2, or it cannot contact a 1.X index
      input {
        elasticsearch {
          hosts => "my.old.host.name:9266"
          index => "my-old-index"
          query => '{
            "query": {
              "bool": {
                "must": [
                  { "match_all": { } }
                ]
              }
            }
          }' 
          size => 500
          scroll => "5m"
          docinfo => true
        }
      }
      
      filter {
        mutate {
          remove_field => ['@version', '@timestamp']
        }
      }
      
      #metadata
      filter {
        mutate {
          rename => { "[metadata]" => "[metadata_OLD]" }
        }
      
        ruby {
          code => "
            metadataDocs = []
            metadataFields = event.get('metadata_OLD')
      
            metadataFields.each { |key, value|
              metadataDoc = {
                'metadataID' => key.to_i,
                'date' => value['date']
              }
      
              if !value['full'].nil?
                metadataDoc[:alphanumeric] = value['full']
              end
      
              if !value['analyzed'].nil?
                metadataDoc[:ngrams] = value['analyzed']
              end
      
              metadataDocs << metadataDoc
            }
      
            event.set('metadata', metadataDocs)
          "
        }
      
        mutate {
          remove_field => ['metadata_OLD']
        }
      }
      
      output {
        elasticsearch {
          hosts => "my.new.host:9266"
          index => "my-new-index"
          document_type => "searchasset"
          document_id => "%{assetID}"
          action => "update"
          doc_as_upsert => true
        }
        file {
          path => "F:\logstash-6.1.2\logs\esMigration.log"
        }  
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-01-07
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-02-03
        • 1970-01-01
        相关资源
        最近更新 更多