【问题标题】:Serilog HTTP sink + Logstash: Splitting Serilog message array into individual log eventsSerilog HTTP sink + Logstash:将 Serilog 消息数组拆分为单独的日志事件
【发布时间】:2017-06-04 10:19:28
【问题描述】:

我们使用 Serilog HTTP sink 将消息发送到 Logstash。但是HTTP消息体是这样的:

{
  "events": [
    {
      "Timestamp": "2016-11-03T00:09:11.4899425+01:00",
      "Level": "Debug",
      "MessageTemplate": "Logging {@Heartbeat} from {Computer}",
      "RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
      "Properties": {
        "Heartbeat": {
          "UserName": "Mike",
          "UserDomainName": "Home"
        },
        "Computer": "Workstation"
      }
    },
    {
      "Timestamp": "2016-11-03T00:09:12.4905685+01:00",
      "Level": "Debug",
      "MessageTemplate": "Logging {@Heartbeat} from {Computer}",
      "RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
      "Properties": {
        "Heartbeat": {
          "UserName": "Mike",
          "UserDomainName": "Home"
        },
        "Computer": "Workstation"
      }
    }
  ]
}

即。日志记录事件在一个数组中批处理。消息是可以一条一条发送的,但那还是单项数组。

然后该事件在 Kibana 中显示为具有字段 message 和值

{
  "events": [
    {
      // ...
    },
    {
      // ...
    }
  ]
}

即。从字面上看,来自 HTTP 输入的内容。

如何将 events 数组中的项目拆分为单独的日志事件并将属性“拉”到顶层,以便在 ElasticSearch 中有两个日志事件:


  "Timestamp": "2016-11-03T00:09:11.4899425+01:00",
  "Level": "Debug",
  "MessageTemplate": "Logging {@Heartbeat} from {Computer}",
  "RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
  "Properties": {
    "Heartbeat": {
      "UserName": "Mike",
      "UserDomainName": "Home"
    },
    "Computer": "Workstation"
  }

  "Timestamp": "2016-11-03T00:09:12.4905685+01:00",
  "Level": "Debug",
  "MessageTemplate": "Logging {@Heartbeat} from {Computer}",
  "RenderedMessage": "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
  "Properties": {
    "Heartbeat": {
      "UserName": "Mike",
      "UserDomainName": "Home"
    },
    "Computer": "Workstation"
  }

我尝试了 Logstash jsonsplit,但无法成功。

【问题讨论】:

    标签: c# logging logstash elastic-stack serilog


    【解决方案1】:

    您可以使用额外的ruby 过滤器从子结构中提取字段来实现您的期望:

    filter {
      split {
       field => "events"
      }
      ruby {
        code => "
           event.to_hash.update(event['events'].to_hash) 
           event.to_hash.delete_if {|k, v| k == 'events'}     
        "
      }
    }
    

    生成的事件将如下所示:

    {
               "@version" => "1",
             "@timestamp" => "2017-01-20T04:51:39.223Z",
                   "host" => "iMac.local",
              "Timestamp" => "2016-11-03T00:09:12.4905685+01:00",
                  "Level" => "Debug",
        "MessageTemplate" => "Logging {@Heartbeat} from {Computer}",
        "RenderedMessage" => "Logging { UserName: \"Mike\", UserDomainName: \"Home\" } from \"Workstation\"",
             "Properties" => {
            "Heartbeat" => {
                      "UserName" => "Mike",
                "UserDomainName" => "Home"
            },
             "Computer" => "Workstation"
        }
    }
    

    【讨论】:

    • 谢谢,帮了大忙。我必须首先添加 json { source => "message" } 以将 events 作为对象拉到日志事件的根目录 - 显然 Serilog 发送的消息没有 application/json 内容类型,因此整个 HTTP 正文被转储为字符串在message 字段中。之后,您的解决方案完美运行。
    • 酷,您可以在输入中使用json 过滤器或json 编解码器。你的选择。很高兴它有帮助!
    • 哦,我错过了编解码器解决方案!我实际上会使用json 编解码器,感觉更干净。谢谢!
    【解决方案2】:

    升级到 Logstash 5.0 后,Val's solutionEvent API 的变化而停止工作:更新event.to_hash 未反映在原始event 中。对于 Logstash 5.0+,必须使用 event.get('field')event.set('field', value) 访问器。

    现在更新的解决方案是:

    input {
      http {
        port => 8080
        codec => json
      }
    }
    
    filter {
      split {
        field => "events"
      }
      ruby {
        code => "
          event.get('events').each do |k, v|
            event.set(k, v)
          end
        "
      }
      mutate {
        remove_field => [ "events" ]
      }
    }
    

    【讨论】:

    【解决方案3】:

    您现在可以通过设置 batchFormatter 来实现。默认批处理格式化程序会创建错误事件,但 ArrayBatchFormatter 会解决此问题:

     logger.WriteTo.DurableHttpUsingFileSizeRolledBuffers(
                        requestUri: new Uri($"http://{elasticHost}:{elasticPort}").ToString(),
                        batchFormatter: new ArrayBatchFormatter());
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-03-24
      • 1970-01-01
      • 1970-01-01
      • 2018-04-14
      • 2022-01-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多