【发布时间】:2023-12-04 05:14:01
【问题描述】:
我有一个 Tornado 网络服务,它将每分钟处理大约 500 个请求。所有这些请求都将到达 1 个特定端点。我使用Cython 编译了一个C++ 程序,并在tornado 服务中使用它作为我的处理器引擎。每个发往/check/ 的请求都会触发C++ 程序中的函数调用(我称之为handler),返回值将作为响应发送给用户。
这就是我包装handler 类的方式。重要的一点是我没有在__init__ 中实例化handler。我的龙卷风代码中有另一条路线,我想在授权请求到达该路线后开始加载 DataStructure。 (例如/reload/)
executors = ThreadPoolExecutor(max_workers=4)
class CheckerInstance(object):
def __init__(self, *args, **kwargs):
self.handler = None
self.is_loading = False
self.is_live = False
def init(self):
if not self.handler:
self.handler = pDataStructureHandler()
self.handler.add_words_from_file(self.data_file_name)
self.end_loading()
self.go_live()
def renew(self):
self.handler = None
self.init()
class CheckHandler(tornado.web.RequestHandler):
async def get(self):
query = self.get_argument("q", None).encode('utf-8')
answer = query
if not checker_instance.is_live:
self.write(dict(answer=self.get_argument("q", None), confidence=100))
return
checker_response = await checker_instance.get_response(query)
answer = checker_response[0]
confidence = checker_response[1]
if self.request.connection.stream.closed():
return
self.write(dict(correct=answer, confidence=confidence, is_cache=is_cache))
def on_connection_close(self):
self.wait_future.cancel()
class InstanceReloadHandler(BasicAuthMixin, tornado.web.RequestHandler):
def prepare(self):
self.get_authenticated_user(check_credentials_func=credentials.get, realm='Protected')
def new_file_exists(self):
return True
def can_reload(self):
return not checker_instance.is_loading
def get(self):
error = False
message = None
if not self.can_reload():
error = True
message = 'another job is being processed!'
else:
if not self.new_file_exists():
error = True
message = 'no new file found!'
else:
checker_instance.go_fake()
checker_instance.start_loading()
tornado.ioloop.IOLoop.current().run_in_executor(executors, checker_instance.renew)
message = 'job started!'
if self.request.connection.stream.closed():
return
self.write(dict(
success=not error, message=message
))
def on_connection_close(self):
self.wait_future.cancel()
def main():
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/check", CheckHandler),
(r"/reload", InstanceReloadHandler),
(r"/health", HealthHandler),
(r"/log-event", SubmitLogHandler),
],
debug=options.debug,
)
checker_instance = CheckerInstance()
我希望该服务在checker_instance.renew 开始在另一个线程中运行后继续响应。但这不是发生的事情。当我点击/reload/ 端点并且renew 函数开始工作时,对/check/ 的任何请求都会暂停并等待重新加载过程完成,然后它会再次开始工作。加载 DataStructure 时,服务应处于 fake 模式,并使用他们作为输入发送的相同查询来响应用户。
我已经在我的开发环境中使用 i5 CPU(4 个 CPU 内核)测试了这段代码,它工作得很好!但在生产环境(3 个双线程 CPU 内核)中,/check/ 端点会暂停请求。
【问题讨论】:
标签: python multithreading asynchronous tornado