【问题标题】:aggregate multiple recursive logstash聚合多个递归logstash
【发布时间】:2020-07-29 20:34:30
【问题描述】:

我正在使用带输入 jdbc 的 logstash,并希望通过聚合将一个对象嵌入到另一个对象中。 如何使用添加递归?

即在另一个对象中添加一个对象?

这是一个例子:

{
  "_index": "my-index",
  "_type": "test",
  "_id": "1",
  "_version": 1,
  "_score": 1,
  "_source": {
    "id": "1",
    "properties": {
      "nested_1": [
        {
          "A": 0,
          "B": "true",
          "C": "PEREZ, MATIAS  ROGELIO Y/O",
          "Nested_2": [
            {
              "Z1": "true",
              "Z2": "99999"
            }
        },
        {
          "A": 0,
          "B": "true",
          "C": "SALVADOR MATIAS ROMERO",
          "Nested_2": [
            {
              "Z1": "true",
              "Z2": "99999"
            }
        }
      ]
    }
  }
}

我正在使用类似的东西,但它不起作用

aggregate {
  task_id => "%{id}"
  code => "
      map['id'] = event.get('id')
      
      map['nested_1_list'] ||= []
      map['nested_1'] ||= []
      if (event.get('id') != nil)
        if !( map['nested_1_list'].include?event.get('id') ) 
          map['nested_1_list'] << event.get('id')
 
          map['nested_1'] << {
            'A' => event.get('a'),                             
            'B' => event.get('b'),
            'C' => event.get('c'),
            
             map['nested_2_list'] ||= []
              map['nested_2'] ||= []
              if (event.get('id_2') != nil)
                if !( map['nested_2_list'].include?event.get('id_2') ) 
                  map['nested_2_list'] << event.get('id_2')
         
                  map['nested_2'] << {
                    'Z1' => event.get('z1'), 
                    'Z2' => event.get('z2')
                  }
                end
              end
          }
        end
      end
       
      event.cancel()
  "
  push_previous_map_as_event => true
  timeout => 3

} 

知道如何实现这个吗?...................... ..........

【问题讨论】:

  • 请帮帮我!

标签: elasticsearch logstash elastic-stack logstash-jdbc


【解决方案1】:

最后我所做的是,从输入中生成 JSON,即从从 logstash 的输入语句中的视图 (vw) 消费的存储过程。

使用后,我将其作为 json 进行处理,并且我已经将该 json 用作另一个变量。

# Convierto el string a json real (quita comillas y barras invertidas)
        ruby {
            code => "
                require 'json'
                json_value = JSON.parse(event.get('field_db').to_s)
                event.set('field_convert_to_json',json_value)
            "
        }

【讨论】:

    【解决方案2】:

    也许你可以试试这个。注意 这仅适用于您想要单个对象而不是对象数组时。 请访问我的博客了解其他格式。 https://xyzcoder.github.io/2020/07/29/indexing-documents-using-logstash-and-python.html

    input {
        jdbc {
               jdbc_driver_library => "/usr/share/logstash/javalib/mssql-jdbc-8.2.2.jre11.jar"
               jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
               jdbc_connection_string => "jdbc:sqlserver://host.docker.internal;database=StackOverflow2010;user=pavan;password=pavankumar@123"
               jdbc_user => "pavan"
               jdbc_password => "pavankumar@123"
               statement => "select top 500 p.Id as PostId,p.AcceptedAnswerId,p.AnswerCount,p.Body,u.Id as userid,u.DisplayName,u.Location
                            from StackOverflow2010.dbo.Posts p inner join StackOverflow2010.dbo.Users u
                            on p.OwnerUserId=u.Id"
            }
    }
    
    filter {
        aggregate {
            task_id => "%{postid}"
            code => "
                map['postid'] = event.get('postid')
                map['accepted_answer_id'] = event.get('acceptedanswerid')
                map['answer_count'] = event.get('answercount')
                map['body'] = event.get('body')
                map['user'] = {
                    'id' => event.get('userid'),
                    'displayname' => event.get('displayname'),
                    'location' => event.get('location')
                }
                map['user']['test'] = {
                        'test_body' => event.get('postid')
                    }
            event.cancel()"
            push_previous_map_as_event => true
            timeout => 30
        }
    }
    
    output {
        elasticsearch {
            hosts => ["http://elasticsearch:9200", "http://elasticsearch:9200"]
            index => "stackoverflow_top"
        }
        stdout {
            codec => rubydebug
        }
    }
    

    我的输出是

    {
            "_index" : "stackoverflow_top",
            "_type" : "_doc",
            "_id" : "S8WEmnMBrXsRTNbKO0JJ",
            "_score" : 1.0,
            "_source" : {
              "@version" : "1",
              "body" : """<p>How do I store binary data in <a href="http://en.wikipedia.org/wiki/MySQL" rel="noreferrer">MySQL</a>?</p>
    """,
              "@timestamp" : "2020-07-29T12:20:22.649Z",
              "answer_count" : 10,
              "user" : {
                "displayname" : "Geoff Dalgas",
                "location" : "Corvallis, OR",
                "test" : {
                  "test_body" : 17
                },
                "id" : 2
              },
              "postid" : 17,
              "accepted_answer_id" : 26
            }
    

    这里测试对象嵌套在用户对象中

    【讨论】:

      猜你喜欢
      • 2021-08-25
      • 2014-03-01
      • 2014-04-09
      • 2014-03-23
      • 2014-04-18
      • 1970-01-01
      • 2021-02-03
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多