【问题标题】:Call to async endpoint gets blocked by another thread对异步端点的调用被另一个线程阻塞
【发布时间】:2023-12-04 05:14:01
【问题描述】:

我有一个 Tornado 网络服务,它将每分钟处理大约 500 个请求。所有这些请求都将到达 1 个特定端点。我使用Cython 编译了一个C++ 程序,并在tornado 服务中使用它作为我的处理器引擎。每个发往/check/ 的请求都会触发C++ 程序中的函数调用(我称之为handler),返回值将作为响应发送给用户。

这就是我包装handler 类的方式。重要的一点是我没有在__init__ 中实例化handler。我的龙卷风代码中有另一条路线,我想在授权请求到达该路线后开始加载 DataStructure。 (例如/reload/


executors = ThreadPoolExecutor(max_workers=4)


class CheckerInstance(object):
    def __init__(self, *args, **kwargs):
        self.handler = None
        self.is_loading = False
        self.is_live = False

    def init(self):
        if not self.handler:
            self.handler = pDataStructureHandler()
            self.handler.add_words_from_file(self.data_file_name)
            self.end_loading()
            self.go_live()

    def renew(self):
        self.handler = None
        self.init()



class CheckHandler(tornado.web.RequestHandler):
    async def get(self):
        query = self.get_argument("q", None).encode('utf-8')
        answer = query

        if not checker_instance.is_live:
            self.write(dict(answer=self.get_argument("q", None), confidence=100))
            return

        checker_response = await checker_instance.get_response(query)
        answer = checker_response[0]
        confidence = checker_response[1]

        if self.request.connection.stream.closed():
            return
        self.write(dict(correct=answer, confidence=confidence, is_cache=is_cache))

    def on_connection_close(self):
        self.wait_future.cancel()


class InstanceReloadHandler(BasicAuthMixin, tornado.web.RequestHandler):
    def prepare(self):
        self.get_authenticated_user(check_credentials_func=credentials.get, realm='Protected')

    def new_file_exists(self):
        return True

    def can_reload(self):
        return not checker_instance.is_loading

    def get(self):
        error = False
        message = None

        if not self.can_reload():
            error = True
            message = 'another job is being processed!'
        else:
            if not self.new_file_exists():
                    error = True
                    message = 'no new file found!'
            else:
                checker_instance.go_fake()
                checker_instance.start_loading()
                tornado.ioloop.IOLoop.current().run_in_executor(executors, checker_instance.renew)
                message = 'job started!'

        if self.request.connection.stream.closed():
            return
        self.write(dict(
            success=not error, message=message
        ))

    def on_connection_close(self):
        self.wait_future.cancel()


def main():
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/check", CheckHandler),
            (r"/reload", InstanceReloadHandler),
            (r"/health", HealthHandler),
            (r"/log-event", SubmitLogHandler),
        ],
        debug=options.debug,
    )
    checker_instance = CheckerInstance()

我希望该服务在checker_instance.renew 开始在另一个线程中运行后继续响应。但这不是发生的事情。当我点击/reload/ 端点并且renew 函数开始工作时,对/check/ 的任何请求都会暂停并等待重新加载过程完成,然后它会再次开始工作。加载 DataStructure 时,服务应处于 fake 模式,并使用他们作为输入发送的相同查询来响应用户。

我已经在我的开发环境中使用 i5 CPU(4 个 CPU 内核)测试了这段代码,它工作得很好!但在生产环境(3 个双线程 CPU 内核)中,/check/ 端点会暂停请求。

【问题讨论】:

    标签: python multithreading asynchronous tornado


    【解决方案1】:

    很难完全跟踪正在处理的事件,因为为了简洁起见,您已经剪掉了一些代码。例如,我在这里没有看到 get_response 实现,所以我不知道它是否正在等待可能依赖于 checker_instance 状态的东西。

    我将探索的一个领域是将checker_instance.renew 传递给run_in_executor 时的线程安全(或看似不存在)。这对我来说是有问题的,因为您正在从一个单独的线程中改变 CheckerInstance 的单个实例的状态。虽然它可能不会明确破坏事情,但似乎这可能会引入奇怪的竞争条件或意外的内存副本,这可能会解释您遇到的意外行为

    如果可能的话,我会让您想要卸载到线程的任何加载行为完全独立,并且在加载数据时,将其作为函数结果返回,然后可以将其反馈给您的 checker_instance。如果您要按原样使用代码执行此操作,则需要等待 run_in_executor 调用其结果,然后更新 checker_instance。这意味着重新加载 GET 请求将等到数据加载完毕。或者,在您的重新加载 GET 请求中,您可以 ioloop.spawn_callback 以这种方式触发 run_in_executor 的函数,从而允许重新加载请求完成而不是等待。

    【讨论】:

    • 感谢您的出色回答,所以基本上您是在建议我实例化 pDataStructureHandler 的一个实例并将其作为函数的结果传递,然后将其反馈给 checker_instance?我会试一试,然后告诉你结果。
    • 关于你的另一个问题,如果get_response 正在等待另一个函数;是的,它确实等待另一个函数来获得最终结果。
    • 我的建议是,如果没有 pDataStructureHandler 如何工作的代码,我建议您重构代码,以便在单独的线程上调用的函数仅接收并返回简单类型,例如没有对象引用的 bools、ints、strings、lists 和 dicts。此外,我建议该方法是一个独立的全局函数,而不是实例方法。虽然这可能不是根本原因,但我的直觉是,每当使用跨线程共享状态时,都会引发您所看到的各种奇怪行为。
    • 我想我可以问的另一个问题是get_response 是否调用了底层 C++ 代码...或者数据是否被加载到 Python 代码中并从 Python 代码中查询?