【问题标题】:Fluentd filter to exclude key with empty valueFluentd 过滤器以排除具有空值的键
【发布时间】:2017-08-21 03:19:16
【问题描述】:

我想排除 seive_name 为空 "service_name":"" 的行。 这是我的流利的conf

## match tag=debug.** and dump to console
<match debug.**>
  @type stdout
</match>
<source>
  @type tail
  path  /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
  pos_file /var/log/td-agent/tmp/wso2carbon.log.pos
  tag debug.wso2.esb
  format /^([TID:]* [^ ]* [^ ]* \[(?<time>[^\]]*)\]) ([^ ]* (?<level>[^ ]*))([^***]*[^=]*[^ ]*(?<service_name>[^,]*)[^=]*[^ ]*(?<step>[^,]*)[^ ]*[^=]*[^ ]*(?<message_id>[^,]*))/
  time_format %Y-%m-%d %H:%M:%S
#  keep_time_key true
</source>

这是日志输出

2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SA_VasGWLogSeq","step":" before vasgwInsertlog","message_id":" urn:uuid:2046f0ed-690d-47b1-aa86-d4a71c021a74"}
2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":"","step":"","message_id":""}
2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SA_VasGWLogSeq","step":" after vasgwInsertlog","message_id":" urn:uuid:2046f0ed-690d-47b1-aa86-d4a71c021a74"}
2017-08-21 10:16:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SERVICE_NAME","step":" Before - SA_ServiceApiDSEp","message_id":" urn:uuid:39e0ecc1-dda5-4cd9-91fc-90e7ed4f5233"}

我想排除下面的行。该怎么做?

{"level":"INFO","service_name":"","step":"","message_id":""}

第二个问题是为什么我在值"service_name":" SERVICE_NAME"之前有一个空格,当我尝试Fluentular I get a nice output without space时。

我已经通过在正则表达式中添加一个空格来解决第二个任务。例如改变

[^=]*[^ ]*(?&lt;service_name&gt;[^,]*)[^=]*[^ ]* (?&lt;service_name&gt;[^,]*)


但我不知道如何编写过滤器来排除 key_name 具有空值的记录,例如 "service_name":""

【问题讨论】:

    标签: regex fluentd


    【解决方案1】:

    grep 过滤器插件的exclude 指令的使用似乎相当简单。

    使用“开始”(^)匹配空消息并排除它,然后是空和结束($)可以通过以下方式完成。

    <filter **>
      @type grep
      <exclude>
        key service_name
        pattern /^$/
        # or, to exclude all messages that are empty or include only white-space:
        # pattern /^\s*$/
      </exclude>
    </filter>
    

    请注意,正则表达式符号在 0.12 和 1.x 之间发生了变化(现在使用前导和尾随斜杠)。

    【讨论】:

      【解决方案2】:

      因为我找不到排除键具有空值的记录的解决方案,所以我使用反向解决方案。我使用 grep 来记录指定的键值。请参阅下面的 Fluentd 配置。

      Fluentd 在每个 WSO2 节点上。

      #############################################################################################
      # Fluentd Configuration File                                                                #
      #                                                                                           #
      # In v1 configuration, type and id are @ prefix parameters.                                 #
      # @type and @id are recommended. type and id are still available for backward compatibility #
      #############################################################################################
      
      ################################
      #            Source            #
      ################################
      ## built-in TCP input
      ## $ echo <json> | fluent-cat <tag>
      <source>
        @type forward
        @id forward_input
      
        port 24224
      </source>
      
      # Listen DRb for debug
      <source>
        @type debug_agent
        @id debug_agent_input
      
        bind 127.0.0.1
        port 24230
      </source>
      
      # HTTP input
      # http://localhost:8888/<tag>?json=<json>
      #<source>
      #  @type http
      #  @id http_input
      
      #  port 8888
      #</source>
      
      # Listen HTTP for monitoring
      # http://localhost:24220/api/plugins
      # http://localhost:24220/api/plugins?type=TYPE
      # http://localhost:24220/api/plugins?tag=MYTAG
      <source>
        @type monitor_agent
        @id monitor_agent_input
      
        port 24220
      </source>
      
      <source>
        @type tail
      
        path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
        pos_file /cellcard/fluent/wso2carbon.log.pos
        tag wso2.esb.service.test
        format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^***]+[^=]+[^ ]+(?<transaction_id>[^,]*)[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<data>[^,]*))/
        time_format %Y-%m-%d %H:%M:%S
        keep_time_key true
      </source>
      
      <source>
        @type tail
      
        path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
        pos_file /cellcard/fluent/wso2carbon.log.pos
        tag wso2.esb.ne.surepay
        format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^***]+[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<transaction_id>[^,]*)[^<?]+(?<payload>[^{]*))/
        time_format %Y-%m-%d %H:%M:%S
        keep_time_key true
      </source>
      
      <source>
        @type tail
      
        path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
        pos_file /cellcard/fluent/wso2carbon.log.pos
        tag wso2.esb.surepay.trigger
        format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^*]+[^=]+[^ ]+(?<client_ip>[^,]*)[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<req_id>[^,]*)[^=]+[^ ]+(?<content_massage>[^,]*)[^=]+[^ ]+)/
        time_format %Y-%m-%d %H:%M:%S
        keep_time_key true
      </source>
      
      
      ###########################
      #        Filter           #
      ###########################
      <filter wso2.esb.service.**>
        @type grep
      
        <regexp>
          key     service_name
          pattern ^\sNew
        </regexp>
      </filter>
      
      <filter wso2.esb.service.**>
        @type record_transformer
        enable_ruby
      
        <record>
          data ${record["data"].strip.split(";").each_slice(2).to_h.to_json}
        </record>
      </filter>
      
      <filter wso2.esb.service.**>
        @type parser
      
        format json
        key_name data
      </filter>
      
      <filter wso2.esb.ne.surepay>
        @type grep
      
        <regexp>
          key     service_name
          pattern ^\sNE_SurePay
        </regexp>
      </filter>
      
      <filter wso2.esb.ne.surepay>
        @type record_transformer
        enable_ruby
      
        <record>
          service_name ${record["service_name"].strip!}
          transaction_id ${record["transaction_id"].strip!}
          payload ${record["payload"].strip!}
        </record>
      </filter>
      
      <filter wso2.esb.surepay.trigger>
        @type grep
      
        <regexp>
          key     service_name
          pattern ^\sSurePayPassiveTrigger
        </regexp>
      </filter>
      
      <filter wso2.esb.surepay.trigger>
        @type record_transformer
        enable_ruby
      
        <record>
          client_ip ${record["client_ip"].strip!}
          service_name ${record["service_name"].strip!}
          req_id ${record["req_id"].strip!}
          content_massage ${record["content_massage"].strip!}
        </record>
      </filter>
      
      
      ###########################
      #        Output           #
      ###########################
      ## Debug
      ## match tag=debug.** and dump to console
      <match debug.**>
        @type stdout
        @id stdout_output
      </match>
      
      ## ESB Service Log
      ## match tag=wso2.esb.**. Forward to Fluentd Collector (, stdout for debug) and write to file
      <match wso2.esb.**>
        @type copy
      
        <store>
          @type forward
          @id forward_output
          buffer_path /cellcard/fluent/buffer/fluentd.forward
          buffer_type file
          flush_interval 10
          send_timeout 60
          heartbeat_type tcp
          heartbeat_interval 20
      
          <server>
            host 172.16.100.243
            port 24224
          </server>
          ## If have sencondary fluentd server for fail-over, enable <secondary> block
          # <secondary>
          #   <server>
          #     host 192.168.0.12
          #   </server>
          # </secondary>
        </store>
      
        <store>
          @type file
          @id file_output
      
          path  /cellcard/fluent/log/wso2
          time_slice_format %Y%m%d%H
          time_slice_wait   10m
          time_format       %Y-%m-%d %H:%M:%S%z
        </store>
      
        <store>
          @type stdout
        </store>
      </match>
      

      Fluentd 收集器(从每个节点上的所有 fluentd 收集数据):

      #############################################################################################
      # Fluentd Server Configuration File                                                                #
      #                                                                                           #
      # In v1 configuration, type and id are @ prefix parameters.                                 #
      # @type and @id are recommended. type and id are still available for backward compatibility #
      #############################################################################################
      
      ################################
      #            Source            #
      ################################
      ## built-in TCP input
      ## $ echo <json> | fluent-cat <tag>
      <source>
        @type forward
        @id forward_input
      
        port 24224
      </source>
      
      # Listen DRb for debug
      <source>
        @type debug_agent
        @id debug_agent_input
      
        bind 127.0.0.1
        port 24230
      </source>
      
      # HTTP input
      # http://localhost:8888/<tag>?json=<json>
      #<source>
      #  @type http
      #  @id http_input
      
      #  port 8888
      #</source>
      
      # Listen HTTP for monitoring
      # http://localhost:24220/api/plugins
      # http://localhost:24220/api/plugins?type=TYPE
      # http://localhost:24220/api/plugins?tag=MYTAG
      <source>
        @type monitor_agent
        @id monitor_agent_input
      
        port 24220
      </source>
      
      
      ###########################
      #        Filter           #
      ###########################
      # <filter wso2.esb.service.**>
      #   @type grep
      
      #   <regexp>
      #     key     service_name
      #     pattern ^New
      #   </regexp>
      # </filter>
      
      # <filter wso2.esb.ne.surepay>
      #   @type grep
      
      #   <regexp>
      #     key     service_name
      #     pattern ^NE_SurePay
      #   </regexp>
      # </filter>
      
      # <filter wso2.esb.ne.surepay>
      #   @type grep
      
      #   <regexp>
      #     key     service_name
      #     pattern ^SurePayPassiveTrigger
      #   </regexp>
      # </filter>
      
      
      ###########################
      #        Output           #
      ###########################
      ## Debug
      ## match tag=debug.** and dump to console
      <match debug.**>
        @type stdout
        @id stdout_output
      </match>
      
      ## ESB Service Log
      ## match tag=wso2.esb.service.** and insert into database (, stdout for debug) and write to file
      <match wso2.esb.**>
        @type copy
      
        <store>
          @type sql
          buffer_path /cellcard/fluent/buffer/fluentd.sql
          buffer_type file
          flush_interval 10
      
          host {ORACLE_HOST}
          port 1521
          database {ORACLE_DATABASE}
          adapter oracle_enhanced
          username {ORACLE_USERNAME}
          password {ORACLE_PADDWORD}
      
          <table>
            table {TABLE_NAME}
            column_mapping 'insert_date:insert_date,transaction_id:transaction_id,service_name:service_name,process_step:process_step,msisdn:msisdn,command:command,transaction_type:transaction_type,action:action,service_price:service_price,subcriber_type:subcriber_type,transaction_status:transaction_status,notification:notification,remark:remark,vas_error_code:vas_error_code,client_username:client_username,client_ip:client_ip,api_url:api_url,api_method:api_method,nei_name:nei_name,nei_error_code:nei_error_code,server_host:server_host'
            # This is the default table because it has no "pattern" argument in <table>
            # The logic is such that if all non-default <table> blocks
            # do not match, the default one is chosen.
            # The default table is required.
          </table>
      
          <table wso2.esb.service.test>
            table {TABLE_NAME}
            column_mapping 'insert_date:insert_date,transaction_id:transaction_id,service_name:service_name,process_step:process_step,msisdn:msisdn,command:command,transaction_type:transaction_type,action:action,service_price:service_price,subcriber_type:subcriber_type,transaction_status:transaction_status,notification:notification,remark:remark,vas_error_code:vas_error_code,client_username:client_username,client_ip:client_ip,api_url:api_url,api_method:api_method,nei_name:nei_name,nei_error_code:nei_error_code,server_host:server_host'
          </table>
      
          <table wso2.esb.ne.surepay>
            table {TABLE_NAME}
            column_mapping 'time:insert_date,transaction_id:transaction_id,service_name:service_name,payload:payload'
          </table>
      
          <table wso2.esb.surepay.trigger>
            table {TABLE_NAME}
            column_mapping 'time:insert_date,client_ip:client_ip,service_name:service_name,req_id:req_id,content_massage:content_massage'
          </table>
        </store>
      
        <store>
          @type file
          path  /cellcard/fluent/log/service
          time_slice_format %Y%m%d%H
          time_slice_wait   10m
          time_format       %Y-%m-%d %H:%M:%S%z
        </store>
      
        <store>
          @type stdout
        </store>
      </match>
      

      注意:我使用 frontd 从 WSO2 跟踪日志,然后插入到 Oracle 数据库中。

      平台: RedHat 7、ruby 2.4.1p111、fluentd 0.12.40、activerecord-oracle_enhanced-adapter (1.8.2)、ruby-oci8 (2.2.5)、fluent-plugin-sql (0.6.1)。

      更新 我已经在 GitHub 上发布了所有配置和安装细节 https://github.com/oemdaro/fluent-oracle-example

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-08-24
        • 1970-01-01
        • 2019-03-26
        • 2014-06-18
        • 2013-03-09
        • 2015-03-11
        相关资源
        最近更新 更多