【问题标题】:Obtain Storm Topology Stats programmatically以编程方式获取 Storm 拓扑统计信息
【发布时间】:2018-12-29 10:43:28
【问题描述】:

我正在围绕我的 Storm 拓扑构建一个监控服务,并希望能够获取不同时间窗口周围的失败元组数量,类似于 Storm UI 如何在 10m、3h 和一维窗口。

我的监控服务目前是在 python 中构建的,因此如果答案涉及 python 库或与语言无关的东西,例如使用 CLI 或访问 REST 端点,将不胜感激。我查看了 Storm CLI 以及文档,但到目前为止,对于 Storm UI 实际从何处获取信息,我都一无所获。

编辑: - 运行storm 0.8.2版(不幸的是我无法控制),所以在升级之前很遗憾storm-ui-rest-api(在0.9.2中发布)不是一个选项。

【问题讨论】:

    标签: apache-storm


    【解决方案1】:

    使用风暴UI Rest API

    sqlInjection@foo:~$ curl http://$STORM_UI_HOST_AND_PORT/api/v1/topology/summary
    

    {"topologies":[{"id":"topology-1-1436004781","encodedId":"topology-1-1436004781","encodedId","name":"topology-1","status ":"ACTIVE","正常运行时间":"40d 21h 51m 59s","tasksTotal":16,"workersTotal":1,"executorsTotal":10}]}

    sqlInjection@foo:~$ curl http://$STORM_UI_HOST_AND_PORT/api/v1/topology/topology-1-1436004781
    

    {"msgTimeout":30,"spouts":[{"executors":3,"emitted":22336820,"errorLapsedSecs":755996,"completeLatency":"232.052","transferred":22336820,"acked ":22340300,"errorPort":6703,"spoutId":"KafkaSpout-removed","tasks":3,"errorHost":"removed","lastError":"java.lang.RuntimeException: java.lang.NullPointerException\n\tat backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)\n\tat backtype.storm.utils.DisruptorQueue.consumeBatch(Di","errorWorkerLogLink":"http://host:port/log?file=worker-6703.log","failed":0,"encodedSpoutId":"KafkaSpout-已删除"}],"executorsTotal":8,"uptime":"67d 21h 15m 2s","encodedId":"topology-1-1436004781","visualizationTable":[{":row":[{":stream":"default",":sani-stream":"default1544803905",":已检查":true},{":stream":"__ack_init",":sani-stream":"s__ack_init973324006",":checked":false},{":stream":"__ack_ack",":sani-stream ":"s__ack_ack1278315507",":checked":false},{":stream":"__ack_fail",":sani ...已删除

    如您所见,您甚至可以捕获螺栓/喷嘴中发生的最后一个错误。

    【讨论】:

    • 这是一个很好的答案,但是,它看起来只在 Storm 0.9.2+ 上可用,不幸的是我们仍然只在 0.8.2 上。
    • 是的:/不幸的是,这些事情不在我的控制范围内。
    • Working link for REST API,很遗憾master没有提供
    【解决方案2】:

    我用python得到这个,如果“失败”太高会重启拓扑

    pid = urllib2.urlopen('http://'+host+':'+port+'/api/v1/topology/summary').read()
        data_pid = json.loads(pid)
        for data in data_pid['topologies']:
            if data['name'] == '':
                print 'no topology'
                break
            elif data['name'] == topology_name:
                url_pid = data['id'].encode("UTF-8")
                break
        content = urllib2.urlopen('http://'+host+':'+port+'/api/v1/topology/'+url_pid).read()
        data_content = json.loads(content)
        if data_content['topologyStats'][0]['failed'] == None:
                data_content['topologyStats'][0]['failed'] = 0
        if data_content['topologyStats'][0]['acked'] == None:
                data_content['topologyStats'][0]['acked'] = 0
        if data_content['topologyStats'][0]['acked'] < data_content['topologyStats'][0]['failed']*10:
                global count
                count  = count + 1
                if count == 2:
                        os.system("monit restart "+ monitor_name)
                        logger.info('restart at '+ time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))
                        count = 0
    

    如果您想了解更多, http://chenshuxiang.applinzi.com/index.php/2017/09/13/storm-ui/

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-05-24
      • 1970-01-01
      • 1970-01-01
      • 2015-01-06
      • 1970-01-01
      • 2016-02-06
      • 2013-03-19
      相关资源
      最近更新 更多