【问题标题】:Tornado AsyncHTTPClient performance degradationTornado AsyncHTTPClient 性能下降
【发布时间】:2019-03-25 21:14:29
【问题描述】:

设置: Python 2.7.15、Tornado 5.1

我有一台网络服务器机器,每秒处理约 40 个/recommend 请求。 平均响应时间为 25 毫秒,但差异很大(某些请求可能需要 500 毫秒以上)。

每个请求在内部生成 1-8 个 Elasticsearch 查询(HTTP 请求)。 每个 Elasticsearch 查询可能需要 1-150 毫秒。

Elasticsearch 请求通过elasticsearch-dsl 库同步处理。

目标是减少 i/o 等待时间(对 Elasticsearch 的查询)并每秒处理更多请求,这样我就可以减少机器数量。 一件事是不可接受的——我不想增加平均处理时间(25ms)。

我在网络上找到了一些 tornado-elasticsearch 实现,但由于我只需要使用一个端点来连接 Elasticsearch (/_search),我正在尝试单独完成。

下面是我的网络服务器的退化实现。在相同负载(每秒约 40 个请求)的情况下,平均请求响应时间增加到 200ms

深入研究,我发现内部异步处理时间(对 Elasticsearch 的查询)不稳定,每个 fetch 调用所需的时间可能不同,总平均值(在 ab 负载测试中)很高.

我正在使用ab 来模拟负载并通过打印当前fetch 处理时间、平均fetch 处理时间和最大处理时间在内部对其进行测量。 一次执行一个请求时(并发 1): ab -p es-query-rcom.txt -T application/json -n 1000 -c 1 -k 'http://localhost:5002/recommend'

我的打印看起来像:[avg req_time: 3, dur: 3] [current req_time: 2, dur: 3] [max req_time: 125, dur: 125] reqs: 8000

但是当我尝试增加并发(最多 8 个)时:ab -p es-query-rcom.txt -T application/json -n 1000 -c 8 -k 'http://localhost:5002/recommend'

现在我的打印看起来像:[avg req_time: 6, dur: 13] [current req_time: 4, dur: 4] [max req_time: 73, dur: 84] reqs: 8000

平均需求现在x2 慢(或根据我的测量x4)! 我在这里想念什么?为什么我会看到这种退化?

web_server.py:

import tornado
from tornado.httpclient import AsyncHTTPClient
from tornado.options import define, options
from tornado.httpserver import HTTPServer
from web_handler import WebHandler

SERVICE_NAME = 'web_server'
NUM_OF_PROCESSES = 1


class Statistics(object):
    def __init__(self):
        self.total_requests = 0
        self.total_requests_time = 0
        self.total_duration = 0
        self.max_time = 0
        self.max_duration = 0


class RcomService(object):
    def __init__(self):
        print 'initializing RcomService...'
        AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient", max_clients=3)
        self.stats = Statistics()

    def start(self, port):
        define("port", default=port, type=int)
        db = self.get_db(self.stats)
        routes = self.generate_routes(db)
        app = tornado.web.Application(routes)
        http_server = HTTPServer(app, xheaders=True)
        http_server.bind(options.port)
        http_server.start(NUM_OF_PROCESSES)
        tornado.ioloop.IOLoop.current().start()

    @staticmethod
    def generate_routes(db):
        return [
            (r"/recommend", WebHandler, dict(db=db))
        ]

    @staticmethod
    def get_db(stats):
        return {
            'stats': stats
        }


def main():
    port = 5002
    print('starting %s on port %s', SERVICE_NAME, port)

    rcom_service = RcomService()
    rcom_service.start(port)


if __name__ == '__main__':
    main()

web_handler.py:

import time
import ujson
from tornado import gen
from tornado.gen import coroutine
from tornado.httpclient import AsyncHTTPClient
from tornado.web import RequestHandler


class WebHandler(RequestHandler):
    def initialize(self, db):
        self.stats = db['stats']

    @coroutine
    def post(self, *args, **kwargs):
        result = yield self.wrapper_innear_loop([{}, {}, {}, {}, {}, {}, {}, {}])  # dummy queries (empty)
        self.write({
            'res': result
        })

    @coroutine
    def wrapper_innear_loop(self, queries):
        result = []
        for q in queries:  # queries are performed serially 
            res = yield self.async_fetch_gen(q)
            result.append(res)
        raise gen.Return(result)

    @coroutine
    def async_fetch_gen(self, query):
        url = 'http://localhost:9200/my_index/_search'

        headers = {
                'Content-Type': 'application/json',
                'Connection': 'keep-alive'
        }

        http_client = AsyncHTTPClient()
        start_time = int(round(time.time() * 1000))
        response = yield http_client.fetch(url, method='POST', body=ujson.dumps(query), headers=headers)
        end_time = int(round(time.time() * 1000))
        duration = end_time - start_time
        body = ujson.loads(response.body)
        request_time = int(round(response.request_time * 1000))
        self.stats.total_requests += 1
        self.stats.total_requests_time += request_time
        self.stats.total_duration += duration
        if self.stats.max_time < request_time:
            self.stats.max_time = request_time
        if self.stats.max_duration < duration:
            self.stats.max_duration = duration
        duration_avg = self.stats.total_duration / self.stats.total_requests
        time_avg = self.stats.total_requests_time / self.stats.total_requests
        print "[avg req_time: " + str(time_avg) + ", dur: " + str(duration_avg) + \
              "] [current req_time: " + str(request_time) + ", dur: " + str(duration) + "] [max req_time: " + \
              str(self.stats.max_time) + ", dur: " + str(self.stats.max_duration) + "] reqs: " + \
              str(self.stats.total_requests)
        raise gen.Return(body)

我尝试使用 async 类(Simplecurl),max_clients 大小,但我不明白在我的情况下最好的曲调是什么。 但是

【问题讨论】:

  • 我敢打赌,它是弹性的,而不是 AsyncHTTP 的退化。您是否尝试过监控 Elastic 的性能?
  • @Fian 是的,不是,我在每个查询响应的毫秒内打印了 took

标签: python python-2.7 tornado


【解决方案1】:

时间增加可能是因为并发==1 时,CPU 未充分利用,而 c==8 时 CPU 利用率 100% 以上,无法赶上所有请求。例如,抽象 CPU 每秒可以处理 1000 个操作,发送请求需要 50 个 CPU 操作,读取请求结果也需要 50 个 CPU 操作。当您有 5 个 RPS 时,您的 CPU 利用率为 50%,平均请求时间为 50 毫秒(发送请求)+ 请求时间 + 50 毫秒(读取请求)。但是,例如,当您有 40 RPS(5 RPS 的 8 倍以上)时,您的 CPU 将被 400% 过度使用,并且一些已完成的请求将等待解析,因此现在平均请求时间为 50 毫秒 + 请求时间 + CPU 等待时间 + 50 毫秒。

总而言之,我的建议是检查两个负载的 CPU 利用率,并且可以肯定的是,分析发送请求和解析响应所需的时间,CPU 可能是您的瓶颈。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-02
    • 2016-11-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多