【问题标题】:Logstash output different fields to different elastic search indicesLogstash 将不同的字段输出到不同的弹性搜索索引
【发布时间】:2017-08-08 13:34:25
【问题描述】:

我有一个Filebeat 实例,它将Apache 访问日志发送到LogstashLogstash 管道转换文件并将处理过的字段(field1, field2 & field3)加载到elastic search到索引indexA。流程简单且有效。这是我的 pipeline.conf

input{
    beats{
        port => "5043"
    }
}
filter 
{

    grok 
    {
        patterns_dir => ["/usr/share/logstash/patterns"]
        match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                    "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                }
        remove_field => "@version"
        remove_field => "beat"
        remove_field => "input_type"
        remove_field => "source"
        remove_field => "type"
        remove_field => "tags"
        remove_field => "http_version"
        remove_field => "@timestamp"
        remove_field => "message"
    }
    mutate
    {
        add_field => { "field1" => "%{access_time}" }
        add_field => { "field2" => "%{host}" }
        add_field => { "field3" => "%{read_timestamp}" }
    }
}
output {
    elasticsearch{
        hosts => ["localhost:9200"]
        index => "indexA"
    }
}

现在我要做的是添加其他三个字段 field4field5 并将它们添加到名为 indexB 的单独索引中。所以最后 indexA 持有 field1 field2field3IndexB 持有 field4field5

到目前为止,这是修改后的 pipeline.conf,似乎不起作用。

input{
    beats{
        port => "5043"
    }
}
filter 
{

    grok 
    {
        patterns_dir => ["/usr/share/logstash/patterns"]
        match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                    "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                }
        remove_field => "@version"
        remove_field => "beat"
        remove_field => "input_type"
        remove_field => "type"
        remove_field => "http_version"
        remove_field => "@timestamp"
        remove_field => "message"
    }
    mutate
    {
        add_field => { "field1" => "%{access_time}" }
        add_field => { "field2" => "%{host}" }
        add_field => { "field3" => "%{read_timestamp}" }
    }   
}
output {
    elasticsearch{
        hosts => ["localhost:9200"]
        index => "indexA"
    }
}
filter
{
    mutate
    {
        add_field => { "field4" => "%{source}" }
        add_field => { "field5" => "%{tags}" }
        remove_field => "field1"
        remove_field => "field2"
        remove_field => "field3"
    }
}
output {
    elasticsearch{
        hosts => ["localhost:9200"]
        index => "indexB"
    }
}   

有人可以指出我哪里出错或解决方案的任何替代方案。

【问题讨论】:

    标签: elasticsearch logstash elastic-stack filebeat


    【解决方案1】:

    您需要使用clone filter 复制您的事件。然后您可以将所需的字段添加到每个相应的事件中,并将它们放入两个不同的 ES 索引中:

    input{
        beats{
            port => "5043"
        }
    }
    filter 
    {
    
        grok 
        {
            patterns_dir => ["/usr/share/logstash/patterns"]
            match =>{   "message" => ["%{IPORHOST:[client_ip]} - %{DATA:[user_name]} \[%{HTTPDATE:[access_time]}\] \"%{WORD:[method]} %{DATA:[url]} HTTP/%{NUMBER:[http_version]}\" %{NUMBER:[response_code]} %{NUMBER:[bytes]}( \"%{DATA:[referrer]}\")?( \"%{DATA:[user_agent]}\")?",
                        "%{IPORHOST:[remote_ip]} - %{DATA:[user_name]} \\[%{HTTPDATE:[time]}\\] \"-\" %{NUMBER:[response_code]} -" ] 
                    }
            remove_field => "@version"
            remove_field => "beat"
            remove_field => "input_type"
            remove_field => "type"
            remove_field => "http_version"
            remove_field => "@timestamp"
            remove_field => "message"
        }
        clone {
            clones => ["log1", "log2"]
        }
        if [type] == "log1" {
            mutate
            {
                add_field => { "field1" => "%{access_time}" }
                add_field => { "field2" => "%{host}" }
                add_field => { "field3" => "%{read_timestamp}" }
            }
        } else {   
            mutate
            {
                add_field => { "field4" => "%{source}" }
                add_field => { "field5" => "%{tags}" }
            }
        }
    }
    output {
        if [type] == "log1" {
            elasticsearch{
                hosts => ["localhost:9200"]
                index => "indexA"
            }
        } else {   
            elasticsearch{
                hosts => ["localhost:9200"]
                index => "indexB"
            }
        }
    }   
    

    【讨论】:

    • 这对我的情况很有帮助。非常感谢您修改代码。
    • 太棒了,很高兴它有帮助!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-12-04
    • 2016-03-28
    • 2022-01-18
    • 2023-03-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多