【问题标题】:rabbitmq / logstash lost messagerabbitmq / logstash 丢失消息
【发布时间】:2016-02-25 09:17:20
【问题描述】:

我有一个rabbitmq,它成功存储了消息,但我的logstash读取队列忽略了我的大部分消息。

RabbitMQ 没问题,我有一个小python脚本来显示所有消息

import pika
i=0
def on_message(channel, method_frame, header_frame, body):
    global i
    print i
    print("Message body", body)
    channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    i+=1

credentials = pika.PlainCredentials('***', '***')
parameters =  pika.ConnectionParameters('***',5672,'logstash', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()
channel.exchange_declare(exchange="logstash", exchange_type="topic", passive=False, durable=True, auto_delete=False)
channel.queue_declare(queue="hbbtv", auto_delete=False, durable=True)
channel.queue_bind(queue="hbbtv", exchange="logstash", routing_key="hbbtv")
channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_message, 'hbbtv')

try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()

connection.close()

我可以看到我所有的消息

12('消息正文', '{"message":"212.95.70.118 - - [25/Feb/2016:11:19:53 +0100]\"获取 /services/web/index.php/OPA/categories/ARTEPLUS7/fr HTTP/1.1\" 200 348 \"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\" \"Opera/9.80(Linux armv7l;HbbTV/1.1.1(;飞利浦;;;飞利浦电视;) CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 固件/003.015.000.001 (PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \" hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host" :"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.118","ident":"-"," auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","re​​quest":"/services/web/index.php/OPA /categories/ARTEPLUS7/fr","httpversion":"1.1","re​​sponse":"200","bytes":"348","re​​ferrer":"\"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\"","agent":" \"歌剧/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0 NETTV/4.3.1 飞利浦电视/2.1.1 固件/003.015.000.001(飞利浦电视, 2.1.1,) en) Presto/2.12.362 Version/12.11 \"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.118","country_code2": "FR","country_code3":"FRA","country_name":"France","continent_code":"EU","re​​gion_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001, “经度”:7.787399999999991,“时区”:“欧洲/巴黎”,“real_region_name”:“阿尔萨斯”,“位置”:[7.787399999999991,48.60040000000001]}}') 13('邮件正文','{"message":"212.95.70.118 - - [25/Feb/2016:11:19:53 +0100]\"获取 /services/web/index.php/OPA/videos/highlights/6/ARTEPLUS7/de/GE HTTP/1.1\" 500 4519 \"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\" \"Opera/9.80(Linux armv7l;HbbTV/1.1.1(;飞利浦;;;飞利浦电视;) CE-HTML/1.0 NETTV/4.3.1 PhilipsTV/2.1.1 固件/003.015.000.001 (PhilipsTV, 2.1.1,) en) Presto/2.12.362 Version/12.11 \" hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host" :"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.118","ident":"-"," auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","re​​quest":"/services/web/index.php/OPA /videos/highlights/6/ARTEPLUS7/de/GE","httpversion":"1.1","re​​sponse":"500","bytes":"4519","re​​ferrer":"\"http://www.arte.tv/hbbtvv2/notv/cehtml/index.cehtml?lang=de_DE&page=PLUS7&tv=false\"" "代理":"\"Opera/9.80 (Linux armv7l; HbbTV/1.1.1 (; Philips; ; ; PhilipsTV; ) CE-HTML/1.0 NETTV/4.3.1 飞利浦电视/2.1.1 固件/003.015.000.001(飞利浦电视, 2.1.1,) en) Presto/2.12.362 Version/12.11 \"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.118","country_code2": "FR","country_code3":"FRA","country_name":"France","continent_code":"EU","re​​gion_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001, “经度”:7.787399999999991,“时区”:“欧洲/巴黎”,“real_region_name”:“阿尔萨斯”,“位置”:[7.787399999999991,48.60040000000001]}}') 14('邮件正文','{"message":"212.95.70.119 - - [25/Feb/2016:11:19:53 +0100]\"获取 /OPA/getOPAData.php?url=videoStreams%3Flanguage%3Dfr%26protocol%3DHTTP%26mediaType%3Dmp4%26quality%3DEQ%2CSQ%2CHQ%26profileAmm%3D%24nin%3AAMM-YTFR-HAB%2CAMM-YTFR%2CAMM-DT %26kind%3DSHOW%26availableScreens%3Dtv%26fields%3DprogramId%2Curl%2Cquality%2CaudioSlot%2CaudioCode%2CaudioLabel%2CaudioShortLabel%2Cchannel%26programId%3D048353-033-A%26platform%3DARTEPLUS7&filename=PLUS7_stream_048353-03-A.jsonfrFR_3. HTTP/1.1\" 200 5508 \"-\" \"Mozilla/5.0 (Linux; Tizen 2.3; 智能集线器;智能电视;智能电视;你; Maple2012) AppleWebKit/538.1+ (KHTML, 像壁虎)TV Safari/538.1+ \" hbbtvdyn.arte.tv","@version":"1","@timestamp":"2016-02-25T10:19:53.000Z","path":"/data/logs/access","host" :"arte-hbbtvdyn-web1.sdv.fr","type":"apache-access","application":"hbbtv","clientip":"212.95.70.119","ident":"-"," auth":"-","timestamp":"25/Feb/2016:11:19:53 +0100","verb":"GET","re​​quest":"/OPA/getOPAData.php?url=videoStreams %3Flanguage%3Dfr%26protocol%3DHTTP%26mediaType%3Dmp4%26quality%3DEQ%2CSQ%2CHQ%26profileAmm%3D%24nin%3AAMM-YTFR-HAB%2CAMM-YTFR%2CAMM-DT%26kind%3DSHOW%26availableScreens%3Dtv%26fields %3DprogramId%2Curl%2Cquality%2CaudioSlot%2CaudioCode%2CaudioLabel%2CaudioShortLabel%2Cchannel%26programId%3D048353-033-A%26platform%3DARTEPLUS7&filename=PLUS7_stream_048353-033-A_fr_FR.json","httpversion":"1.1","re​​sponse": "200","bytes":"5508","re​​ferrer":"\"-\"","agent":"\"Mozilla/5.0 (Linux;Tizen 2.3;SmartHub;SMART-TV;SmartTV;U;Maple2012) AppleWebKit/538.1+ (KHTML, like Gecko) TV Safari/538.1+ \"","targethost":"hbbtvdyn.arte.tv","geoip":{"ip":"212.95.70.119","country_code2":"FR","country_code3":"FRA","country_name" :"France","continent_code":"EU","re​​gion_name":"C1","city_name":"Strasbourg","latitude":48.60040000000001,"longitude":7.787399999999991,"timezone":"Europe/Paris" ,"real_region_name":"阿尔萨斯","location":[7.787399999999991,48.60040000000001]}}')

具有良好的速率消息(每秒几个),我绝对没有 grok 解析失败。

所以问题发生在 logstash 读取消息时。问题是

  • 很多消息丢失
  • 所有消息都有_grokparsefailure,即使 完成了

logstash的输入部分是

rabbitmq {
    host=>"arte-elasticlog.sdv.fr"
    user=>"***"
    password=>"***"
    queue=>"hbbtv"
    vhost=>"logstash"
    port=>5672
    auto_delete=>false
    durable=>true
    type => "rabbit_hbbtv"
  }

【问题讨论】:

    标签: rabbitmq logstash pika


    【解决方案1】:

    _grokparsefailure 表示无法解析消息。表示消息已成功从队列中读取,但您的 grok 表达式有问题或无法应用于您的消息内容。

    还有一点是rabbitmq输入的默认编解码器是“json”,如果你的rabbitmq消息内容不是json,你应该设置你输入的编解码器例如:

    编解码器 => 普通 {}

    【讨论】:

    【解决方案2】:

    问题是由于我的 logstash 过滤器, 我有两个 apache 访问权限,但模式不同,所以当 logstash 尝试解析消息时,有时他有很好的模式 --> 在 ES 中,有时没有 --> 没有消息。

    现在,对于我所有不同的日志,我添加(添加字段)

    application-->"my application name"
    

    在我的输入中,我所有的 grok 过滤器都取决于应用程序。

    现在一切都很好,感谢您的帮助。

    【讨论】: