【问题标题】:Using field as input to Logstash Grok filter pattern使用字段作为 Logstash Grok 过滤器模式的输入
【发布时间】:2014-10-23 07:23:44
【问题描述】:

我想知道是否可以使用 Logstash 消息中的字段作为 Grok 模式的输入。假设我有一个看起来像这样的条目:

{
    "message":"10.1.1.1",
    "grok_filter":"%{IP:client}"
}

我希望能够做这样的事情:

filter {
  grok {
    match => ["message", ["%{grok_filter}"]]
  }
}

问题是这会使 Logstash 崩溃,因为它似乎将“%{grok_filter}”视为 Grok 过滤器本身,而不是 grok_filter 的值。 Logstash 崩溃后我得到以下信息:

The error reported is: 
  pattern %{grok_filter} not defined

是否可以从 Grok 过滤器块中获取字段的值并将其用作 Grok 模式的输入?

【问题讨论】:

    标签: filter logstash grok


    【解决方案1】:

    答案是否定的——grok 过滤器在初始化过滤器时编译它的模式。如果您需要做类似的事情,您将不得不编写自己的过滤器,每次都编译模式(并支付性能损失)。

    如果不知道您为什么要这样做,就很难推荐最佳的行动方案。如果您的模式数量有限,您可以设置一个grok_filter_type 参数,然后拥有一堆if [grok_filter_type] == 'ip' { grok { ... } } 类型的东西。

    这是一个自定义过滤器,可让您做您想做的事——它主要是 grok 代码的副本,但有一些更改/简化。我已经对其进行了测试,它似乎对我有用。

    # encoding: utf-8
    require "logstash/filters/base"
    require "logstash/namespace"
    require "logstash/environment"
    require "set"
    
    # A version of grok that can parse from a log-defined pattern.  Not really
    # recommended for high usage patterns, but for the occassional pattern it
    # should work
    #     filter {
    #       grok_dynamic {
    #         match_field => "message"
    #     pattern_field => "message_pattern"
    #       }
    #     }
    #
    class LogStash::Filters::GrokDynamic < LogStash::Filters::Base
      config_name "grok_dynamic"
      milestone 1
    
      # The field that contains the data to match against
      config :match_field, :validate => :string, :required => true
      # the field that contains the pattern
      config :pattern_field, :validate => :string, :required => true
      # where the patterns are
      config :patterns_dir, :validate => :array, :default => []
    
      # If true, only store named captures from grok.
      config :named_captures_only, :validate => :boolean, :default => true
    
      # If true, keep empty captures as event fields.
      config :keep_empty_captures, :validate => :boolean, :default => false
    
      # Append values to the 'tags' field when there has been no
      # successful match
      config :tag_on_failure, :validate => :array, :default => ["_grokparsefailure"]
    
      # The fields to overwrite.
      #
      # This allows you to overwrite a value in a field that already exists.
      config :overwrite, :validate => :array, :default => []
    
      # Detect if we are running from a jarfile, pick the right path.
      @@patterns_path ||= Set.new
      @@patterns_path += [LogStash::Environment.pattern_path("*")]
    
      public
      def initialize(params)
        super(params)
        @handlers = {}
      end
    
      public
      def register
        require "grok-pure" # rubygem 'jls-grok'
    
        @patternfiles = []
    
        # Have @@patterns_path show first. Last-in pattern definitions win; this
        # will let folks redefine built-in patterns at runtime.
        @patterns_dir = @@patterns_path.to_a + @patterns_dir
        @logger.info? and @logger.info("Grok patterns path", :patterns_dir => @patterns_dir)
        @patterns_dir.each do |path|
          if File.directory?(path)
            path = File.join(path, "*")
          end
    
          Dir.glob(path).each do |file|
            @logger.info? and @logger.info("Grok loading patterns from file", :path => file)
            @patternfiles << file
          end
        end
    
        @patterns = Hash.new { |h,k| h[k] = [] }
    
        @grok = Grok.new
        @patternfiles.each { |path| @grok.add_patterns_from_file(path) }
    
      end # def register
    
      public
      def filter(event)
        return unless filter?(event)
        return if event[@match_field].nil? || event[@pattern_field].nil?
    
        @logger.debug? and @logger.debug("Running grok_dynamic filter", :event => event);
        @grok.compile(event[@pattern_field]);
        if match(@grok,@match_field, event)
          filter_matched(event)
        else
          # Tag this event if we can't parse it. We can use this later to
          # reparse+reindex logs if we improve the patterns given.
          @tag_on_failure.each do |tag|
            event["tags"] ||= []
            event["tags"] << tag unless event["tags"].include?(tag)
          end
        end
    
        @logger.debug? and @logger.debug("Event now: ", :event => event)
      end # def filter
    
      private
      def match(grok, field, event)
        input = event[field]
        if input.is_a?(Array)
          success = true
          input.each do |input|
            match = grok.match(input)
            if match
              match.each_capture do |capture, value|
                handle(capture, value, event)
              end
            else
              success = false
            end
          end
          return success
        #elsif input.is_a?(String)
        else
          # Convert anything else to string (number, hash, etc)
          match = grok.match(input.to_s)
          return false if !match
    
          match.each_capture do |capture, value|
            handle(capture, value, event)
          end
          return true
        end
      rescue StandardError => e
        @logger.warn("Grok regexp threw exception", :exception => e.message)
      end
    
      private
      def handle(capture, value, event)
        handler = @handlers[capture] ||= compile_capture_handler(capture)
        return handler.call(value, event)
      end
    
      private
      def compile_capture_handler(capture)
        # SYNTAX:SEMANTIC:TYPE
        syntax, semantic, coerce = capture.split(":")
    
        # each_capture do |fullname, value|
        #   capture_handlers[fullname].call(value, event)
        # end
    
        code = []
        code << "# for capture #{capture}"
        code << "lambda do |value, event|"
        #code << "  p :value => value, :event => event"
        if semantic.nil?
          if @named_captures_only
            # Abort early if we are only keeping named (semantic) captures
            # and this capture has no semantic name.
            code << "  return"
          else
            field = syntax
          end
        else
          field = semantic
        end
        code << "  return if value.nil? || value.empty?" unless @keep_empty_captures
        if coerce
          case coerce
            when "int"; code << "  value = value.to_i"
            when "float"; code << "  value = value.to_f"
          end
        end
    
        code << "  # field: #{field}"
        if @overwrite.include?(field)
          code << "  event[field] = value"
        else
          code << "  v = event[field]"
          code << "  if v.nil?"
          code << "    event[field] = value"
          code << "  elsif v.is_a?(Array)"
          code << "    event[field] << value"
          code << "  elsif v.is_a?(String)"
          # Promote to array since we aren't overwriting.
          code << "    event[field] = [v, value]"
          code << "  end"
        end
        code << "  return"
        code << "end"
    
        #puts code
        return eval(code.join("\n"), binding, "<grok capture #{capture}>")
      end # def compile_capture_handler
    
    end # class LogStash::Filters::Grok
    

    【讨论】:

    • 用例是一个日志记录解决方案,其中发送日志的客户端能够包含“类型”属性(使用 logstash-forwarder),因此我可以在服务器端定义许多 grok 模式。但是,如果我想解析一个不太常见的日志,我宁愿在客户端上配置一个 grok 模式,而不是每次都重新配置和重新启动 logstash。我没有仔细研究过 Grok 的来源。您是否认为我可以通过提供模式和模式名称来调整 Grok 过滤器以在第一次收到过滤器时对其进行编译?即 grok_name、grok_pattern 作为字段?
    • 这似乎是你最好的选择...基于 grok 制作一个新的过滤器,可以采用 grok_from_field - 然后以这种方式设置新的 grok 过滤器...可以摆脱很多处理一系列模式的复杂性。这个周末或下周我可能会自己尝试一下,因为这听起来是一个有趣的挑战。
    • 感谢所有帮助。如果我尝试一下,我会更新。不幸的是,对于这个项目,我是一个 Python 人,所以我需要换个档次。
    • 自 2014 年以来是否有关于此主题的任何消息,或者我是否仍需要自定义过滤器来实现此目标?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-11-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-04-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多