【问题标题】:How to delete all documents in elasticsearch with logstash from a search如何使用logstash从搜索中删除elasticsearch中的所有文档
【发布时间】:2020-06-13 06:34:02
【问题描述】:

我正在使用 logstash 将数据传递给 elasticsearch,我想知道如何删除所有文档。

我这样做是为了删除那些带有id的,但我现在需要的是删除所有匹配固定值的文档,例如Fixedfield =“Base1”,无论jdbc输入中获得的id是否存在或不是。

这个想法是删除所有 elasticsearch fixedField = "Base1" 存在的文档,并插入我从 jdbc 输入中获得的新文档,这样我可以避免留下不再存在于我的源(jdbc 输入)中的文档。 一个更完整的例子

我的document_id形成:001、002、003等

我的固定字段由三个document_id的“Base1”组成

有什么想法吗?

input {
  jdbc {
  jdbc_driver_library => ""
  jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  jdbc_connection_string => "jdbc:sqlserver://xxxxx;databaseName=xxxx;"
  statement => "Select * from public.test"
  }
}

filter {
if [is_deleted] {
        mutate {    
            add_field => {
                "[@metadata][elasticsearch_action]" => "delete"
            }
        }
        mutate {
            remove_field => [ "is_deleted","@version","@timestamp" ]
        }
    } else {
        mutate {    
            add_field => {
                "[@metadata][elasticsearch_action]" => "index"
            }
        }
        mutate {
            remove_field => [ "is_deleted","@version","@timestamp" ]
        }
    } 
}

output {
  elasticsearch {
    hosts => "xxxxx"
    user => "xxxxx"
    password => "xxxxx"
    index => "xxxxx"
    document_type => "_doc"
    document_id => "%{id}"
  }
  stdout { codec => rubydebug }
}

【问题讨论】:

    标签: elasticsearch logstash


    【解决方案1】:

    我终于设法消除了,但是.....我现在遇到的问题显然是当输入开始时,它会计算它获得的记录数,当它继续朝向输出时,它会在第一轮消除,然后在以下n-1轮显示错误信息:

    [HTTP 输出失败] 遇到非 2xx HTTP 代码 409 {:response_code=>409, :url=>"http://localhost:9200/my_index/_delete_by_query",

    另一个,我认为可能发生的情况是_delete_by_query不是批量删除,而是查询/删除,这会导致查询返回n个结果,因此尝试删除n次。

    任何想法我可以如何迭代一次或如何避免该错误? 我澄清一下,错误不仅显示一次,而是显示要删除的文档数n-1次

    input {
      jdbc {
      jdbc_driver_library => ""
      jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      jdbc_connection_string => "jdbc:sqlserver://xxxxx;databaseName=xxxx;"
      statement => "Select * from public.test"
      }
    }
    
    
    
    output {
       stdout { codec => json_lines }
    
      elasticsearch {
            hosts => "localhost:9200"
            index => "%{[@metadata][miEntidad]}"
            document_type => "%{[@metadata][miDocumento]}"
            document_id => "%{id}"
      }
    
    
     http {
          url => "http://localhost:9200/my_index/_delete_by_query"
          http_method => "post"
          format => "message"
          content_type => "application/json; charset=UTF-8"
          message => '{"query": { "term": { "properties.codigo.keyword": "TEX_FOR_SEARCH_AND_DELETE"  } }}'
      }
     }
    

    【讨论】:

      【解决方案2】:

      最后它是这样工作的:

      output {  
      
          http {
                      url => "http://localhost:9200/%{[@metadata][miEntidad]}/_delete_by_query?conflicts=proceed"
                      http_method => "post"
                      format => "message"
                      content_type => "application/json; charset=UTF-8"
                      message => '{"query": { "term": { "properties.code.keyword": "%{[properties][code]}"  } }}'
      
          }
      
          jdbc {
                      connection_string => 'xxxxxxxx'       
                      statement => ["UPDATE test SET estate = 'A' WHERE entidad = ? ","%{[@metadata][miEntidad]}"]   
          } 
      }
      

      【讨论】:

        猜你喜欢
        • 2014-10-16
        • 1970-01-01
        • 1970-01-01
        • 2020-07-17
        • 1970-01-01
        • 2021-11-09
        • 1970-01-01
        • 1970-01-01
        • 2015-08-31
        相关资源
        最近更新 更多