【问题标题】:logstash GROK filter along with KV plugin couldn't able to process the eventslogstash GROK 过滤器和 KV 插件无法处理事件
【发布时间】:2019-11-01 21:11:23
【问题描述】:

我是 ELK 的新手。当我载入下面的日志文件时,它将进入logstash中的“死信队列”,因为logstash无法处理事件。我已经编写了GROK过滤器来解析事件但logstash仍然无法处理事件.任何帮助将不胜感激。

以下是示例日志格式。

25193662345 [http-nio-8080-exec-44] DEBUG c.s.b.a.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=31, totalTime=33 tenantId=b9sdfs-1033-4444-aba5-csdfsdfsf, immutableBlobId=bss_c_586331/Sample_app12-sdas-157123148464.txt, blobSize=2862, domain=abc

2519366789 [http-nio-8080-exec-47] DEBUG q.s.b.y.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde

GROK 过滤器:

dissect { mapping => { "message" => "%{NUMBER:number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" } }
    kv { source => "[@metadata][msg]" field_split => "," }

谢谢

【问题讨论】:

    标签: logstash logstash-grok logstash-configuration logstash-file logstash-forwarder


    【解决方案1】:

    你的配置基本上有两个问题。

    1.) 您使用的是dissect 过滤器,而不是grok,两者都用于解析消息,但grok 使用正则表达式来验证字段的值,而dissect 只是位置,它不执行任何验证,如果您在需要 NUMBER 的字段的位置有一个 WORD 值,grok 将失败,但dissect 不会。

    如果你的日志行总是有相同的模式,你应该继续使用dissect,因为它更快并且需要更少的cpu。

    您正确的dissect 映射应该是:

    dissect {
        mapping => { "message" => "%{number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" }
    }
    

    2.) 包含 kv 消息的字段是错误的,它有由空格和逗号分隔的字段,kv 不会这样工作。

    在您的dissect 过滤后,这是[@metadata][msg] 的内容。

    method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde
    

    要解决此问题,您应该使用 mutate 过滤器从 [@metadata][msg] 中删除逗号,并使用默认配置的 kv 过滤器。

    这应该是您的过滤器配置

    filter {
        dissect {
            mapping => { "message" => "%{number} [%{thread}] %{level} %{class} - %{[@metadata][msg]}" }
        }
        mutate {
            gsub => ["[@metadata][msg]",",",""]
        }
        kv {
            source => "[@metadata][msg]"
        }
    }
    

    你的输出应该是这样的:

    {
                 "number" => "2519366789",
             "@timestamp" => 2019-11-03T16:42:11.708Z,
                 "thread" => "http-nio-8080-exec-47",
           "appLogicTime" => "1",
                 "domain" => "cde",
                 "method" => "PUT",
                  "level" => "DEBUG",
               "blobSize" => "2862",
               "@version" => "1",
        "immutableBlobId" => "bss_c_586334/Sample_app15-615223-157sadas6648465.txt",
           "streamInTime" => "0",
                 "status" => "201",
        "blobStorageTime" => "32",
                "message" => "2519366789 [http-nio-8080-exec-47] DEBUG q.s.b.y.m.PerformanceMetricsFilter - method=PUT status=201 appLogicTime=1, streamInTime=0, blobStorageTime=32, totalTime=33 tenantId=b0csdfsd-1066-4444-adf4-ce7bsdfssdf, immutableBlobId=bss_c_586334/Sample_app15-615223-157sadas6648465.txt, blobSize=2862, domain=cde",
              "totalTime" => "33",
               "tenantId" => "b0csdfsd-1066-4444-adf4-ce7bsdfssdf",
                  "class" => "q.s.b.y.m.PerformanceMetricsFilter"
    }
    

    【讨论】:

    • 感谢您的回复。当我尝试上述建议的过滤器时,logstash 正在将事件发送到死信队列。
    • 我已经测试了管道并且它没有任何错误,请用你的完整管道和你得到的任何错误日志更新你的问题。另外,请指定您使用的版本。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-03-30
    • 1970-01-01
    • 1970-01-01
    • 2017-07-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多