【问题标题】:How to remove dynamic mongodb oid's in logstash?如何删除logstash中的动态mongodb oid?
【发布时间】:2021-04-04 09:58:03
【问题描述】:

Mongodb有2种$oid引用-

类型 1 -

//MongoDB
city_id : "5fe3206428bf745876649fd3"

//Kafka Message
city_id : {
    "$oid": "5fe3206428bf745876649fd3"
}

类型 2 -

//MongoDB
city_ids : ["5fe3206428bf745876649fd3","5fe3206428bf745876649fd3","5fe3206428bf745876649fd3"]

//Kafka Message
city_ids : [
  {
    "$oid": "5fe3206428bf745876649fd3"
  },
  {
    "$oid": "5fe3206428bf745876649fd3"
  },
  {
    "$oid": "5fe3206428bf745876649fd3"
  }
]

我如何在logstash中处理这两种类型,以便我得到elasticsearch的确切数据结构,因为它保存在MongoDB中。

input {
  kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["users","organisations","cities"]
  }
}
filter { 
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }
    
    mutate {
        rename => { "[payload]" => "document"}
        remove_field => ["message","json_payload","payload"]
        add_field => {
          "[es_index]" => "%{[@metadata][kafka][topic]}" 
          "[mongo_id]" => "%{[document][_id][$oid]}"
        }
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        document_id => "%{mongo_id}"
    }
    stdout {
      codec =>
        rubydebug {
            metadata => true
        }
    }
}

这是对之前question 的跟进。

【问题讨论】:

    标签: mongodb elasticsearch apache-kafka logstash


    【解决方案1】:

    以下将对包含 $oid 条目的每个字段动态执行此操作。它对结构做了很多假设——如果它包含 $oid 条目,那么这就是保留的全部内容。

        ruby {
            code => '
                event.to_hash.each { |k, v|
                    if v.is_a? Hash
                        if v["$oid"]
                            event.set(k, v["$oid"])
                        end
                    end
                    if v.is_a? Array
                        if v[0]["$oid"]
                            a = []
                            v.each { |x| a << x["$oid"] }
                            event.set(k, a)
                        end
                    end
                }
            '
        }
    

    【讨论】:

    • 这可以是动态的吗?还是我必须为所有集合中的每个键编写特定的逻辑?这样,每次我修改我的集合并拥有类似的密钥时,我都需要在 logstash 中单独添加逻辑。
    • 你的意思是 [city_ids] 字段名称可以更改?
    • 是的,这就是我的意思,city_ids 就是一个例子。
    • $oid 将是每个此类键的共同子项。
    • ruby 过滤器更新以动态完成。
    猜你喜欢
    • 2017-06-10
    • 1970-01-01
    • 2014-08-14
    • 1970-01-01
    • 2022-01-14
    • 2021-08-08
    • 1970-01-01
    • 1970-01-01
    • 2015-02-01
    相关资源
    最近更新 更多