【问题标题】:Filter Kafka JSON messages with Logstash grok使用 Logstash grok 过滤 Kafka JSON 消息
【发布时间】:2017-11-10 17:08:20
【问题描述】:

我尝试仅过滤德国 (DE) 的 kafka json 消息。为此,我必须写一个 grok 表达式。谁能帮我为这个 json 写一个 grok 模式?

{"table":"ORDERS","type":"I","payload":{"ID":"28112","COUNTRY":"DE","AMT":15.36}}
{"table":"ORDERS","type":"I","payload":{"ID":"28114","COUNTRY":"US","AMT":25.75}}

抱歉,我是这些技术的新手。这是我的 logstash.conf 的样子:

input { 
  kafka {topics => [ "test" ] auto_offset_reset => "earliest" } 
} 

filter { 
  grok {
    match => { "message" => "?????????" }

  if [message] =~ "*COUNTRY*DE*" { 
    drop{}
  }
}      
}

output { file { path => "./test.txt"  } }

最后我只想提交德国订单。希望得到帮助,谢谢!

【问题讨论】:

  • 为什么需要 grok?您只需使用if [message] =~ "COUNTRY\":\"DE\"" { drop{} },它应该可以正常工作。

标签: json apache-kafka logstash logstash-grok


【解决方案1】:

您需要使用 Logstash 吗? 如果没有,我会建议一个简单的 KSQL 语句

CREATE STREAM GERMAN_ORDERS AS SELECT * FROM ORDERS WHERE COUNTRY='DE';

这会创建一个从第一个开始流式传输的 Kafka 主题,其中包含您想要的数据。如果您希望将其作为处理管道的一部分,您可以使用 Kafka Connect 将其放入 Kafka 主题中。

阅读使用 KSQL 的示例here,并尝试一下here

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-03-01
    • 2019-04-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多