input
{
    kafka
    {
        bootstrap_servers => "192.168.32.36:9092,192.168.32.37:9092,192.168.32.38:9092"
        topics => "msa-log-prod"
        codec => "json"
        group_id => "msa-log-prod-elsearch"
        consumer_threads => 4
        decorate_events => true
    }
    kafka
    {
        bootstrap_servers => "192.168.32.36:9092,192.168.32.37:9092,192.168.32.38:9092"
        topics => "msa-log-test"
        codec => "json"
        group_id => "msa-log-test-elsearch"
        consumer_threads => 2
        decorate_events => true
    }
}

output
{
    if [env] == "prod" {
        elasticsearch
        {
            hosts => ["192.168.32.36:9200","192.168.32.37:9200","192.168.32.38:9200"]
            index => "msa-log-prod-%{+YYYY.MM.dd}"
        }
    }
    if [env] == "test" {
        elasticsearch
        {
            hosts => ["192.168.32.36:9200","192.168.32.37:9200","192.168.32.38:9200"]
            index => "msa-log-test-%{+YYYY.MM.dd}"
        }
    }
}

说明:需要kafka消息格式是json并且包含一个env字段用于区分环境,如果仅一个input和output可以去掉if判断。

相关文章:

  • 2022-12-23
  • 2022-01-25
  • 2021-12-24
  • 2022-12-23
  • 2022-12-23
  • 2021-12-27
  • 2021-05-16
  • 2021-12-02
猜你喜欢
  • 2021-05-29
  • 2022-12-23
  • 2021-11-20
  • 2022-12-23
  • 2021-08-26
相关资源
相似解决方案