【问题标题】:Logstash with elasticsearch output: how to write to different indices?具有弹性搜索输出的 Logstash:如何写入不同的索引?
【发布时间】:2017-03-09 14:46:07
【问题描述】:

我希望在这里找到我从昨天开始一直在努力的问题的答案:

我正在使用 rabbitMQ 输入和 elasticsearch 输出配置 Logstash 1.5.6。

消息以批量格式在rabbitMQ中发布,我的logstash使用它们并将它们全部写入elasticsearch默认索引logstash-YYY.MM.DD,配置如下:

input {
  rabbitmq {
  host => 'xxx'
  user => 'xxx'
  password => 'xxx'
  queue => 'xxx'
  exchange => "xxx"
  key => 'xxx'
  durable => true
}

output {
  elasticsearch {
  host => "xxx"
  cluster => "elasticsearch"
  flush_size =>10
  bind_port => 9300
  codec => "json"
  protocol => "http"
  }
stdout { codec => rubydebug }
}

现在我要做的是将消息发送到不同的弹性搜索索引。

来自 amqp 输入的消息已经具有索引和类型参数(批量格式)。

所以在阅读文档后: https://www.elastic.co/guide/en/logstash/1.5/event-dependent-configuration.html#logstash-config-field-references

我尝试这样做

input {
  rabbitmq {
  host => 'xxx'
  user => 'xxx'
  password => 'xxx'
  queue => 'xxx'
  exchange => "xxx"
  key => 'xxx'
  durable => true
}

output {
  elasticsearch {
  host => "xxx"
  cluster => "elasticsearch"
  flush_size =>10
  bind_port => 9300
  codec => "json"
  protocol => "http"
  index => "%{[index][_index]}"
  }
stdout { codec => rubydebug }
}

但logstash 所做的是创建索引 %{[index][_index]} 并将所有文档放在那里,而不是读取 _index 参数并将文档发送到那里!

我还尝试了以下方法:

index => %{index}
index => '%{index}'
index => "%{index}"

但似乎没有一个工作。

有什么帮助吗?

继续,这里的主要问题是:如果rabbitMQ消息具有这种格式:

{"index":{"_index":"indexA","_type":"typeX","_ttl":2592000000}}
{"@timestamp":"2017-03-09T15:55:54.520Z","@version":"1","@fields":{DATA}}

如何告诉 logstash 以“typeX”类型在名为“indexA”的索引中发送输出??

【问题讨论】:

    标签: elasticsearch rabbitmq logstash bulk


    【解决方案1】:

    如果您在 RabbitMQ 中的消息已经是批量格式,那么您不需要使用elasticsearch 输出,但一个简单的http 输出点击_bulk 端点就可以解决问题:

    output {
        http {
            http_method => "post"
            url => "http://localhost:9200/_bulk"
            format => "message"
            message => "%{message}"
        }
    }
    

    【讨论】:

    • 感谢您的回答,但我通过您的方法得到以下信息:“[HTTP Output Failure] Encountered non-200 HTTP code 200”, :response_code=>400, :url=>"localhost:9200/_bulk ",
    • 我试图将日志错误附加到我的答案中,但字符数有限,如果没有发布未完成的评论,我什至无法进入“内联”!
    • 你能在你的输出部分添加这个并显示一个打印出来的事件吗:stdout { codec => plain }
    • 您还可以显示从 Elasticsearch 服务器日志中获得的错误日志吗?
    【解决方案2】:

    所以大家,在 Val 的帮助下,解决方案是:

    • 正如他所说,由于 RabbitMQ 消息已经是批量格式,不需要使用弹性搜索输出,_bulk API 的 http 输出就可以了(傻我)
    • 所以我用这个替换了输出:

      output {
         http {
         http_method => "post"
         url => "http://172.16.1.81:9200/_bulk"
         format => "message"
         message => "%{message}" 
      }
      stdout { codec => json_lines }
      }
      
    • 但它仍然无法正常工作。我使用的是 Logstash 1.5.6,升级到 Logstash 2.0.0 (https://www.elastic.co/guide/en/logstash/2.4/_upgrading_using_package_managers.html) 后,它使用相同的配置。

    就是这样:)

    【讨论】:

    • 基本上,我的解决方案 + 升级到 Logstash 2 ;-)
    • + 你可以去掉stdout输出,那只是为了调试
    • 升级后,您问题中的 elasticsearch 输出应该也能正常工作
    【解决方案3】:

    如果你在 Rabbitmq 中存储 JSON 消息,那么这个问题就可以解决。 在 JSON 消息中使用索引和类型作为字段,并将这些值分配给 Elasticsearch 输出插件。

    索引 => “%{指数}” //来自 Kafka Producer document_type 的 JSON 正文的索引 => "%{type}" }              //来自 JSON 正文的类型

    使用这种方法,每条消息都可以有自己的索引和类型。

       

    【讨论】:

      猜你喜欢
      • 2023-03-16
      • 1970-01-01
      • 2022-01-18
      • 1970-01-01
      • 1970-01-01
      • 2016-03-28
      • 2022-12-04
      • 2015-03-16
      • 1970-01-01
      相关资源
      最近更新 更多