【问题标题】:Python: Improving performance - Writing to database in seperate threadPython:提高性能 - 在单独的线程中写入数据库
【发布时间】:2021-01-07 20:30:28
【问题描述】:

我正在运行一个 python 应用程序,由于各种原因,我必须将我的程序托管在世界某个地方的服务器上,然后将我的数据库放在另一个地方。

我通过一个简单的脚本进行了测试,从我在邻国的家到数据库服务器,与当我在世界另一端的 python 服务器执行相同的操作时,0,16 秒。 这是一个问题,因为我试图让我的 python 应用程序尽可能快,所以我想知道是否有一种聪明的方法可以做到这一点?

当我同步运行我的代码时,我的程序每次必须写入数据库时​​都在等待,这大约是每秒 3 次,所以时间加起来。是否可以在单独的线程或其他东西中运行与数据库的连接,以便在尝试将数据发送到数据库时不会停止整个程序?或者这可以使用 asyncio 来完成(我没有使用异步代码的经验)?

我真的很难找到解决这个问题的好方法。 提前,非常感谢!

【问题讨论】:

    标签: python mysql python-multithreading mysql-python


    【解决方案1】:

    是的,您可以创建一个在后台执行写入的线程。在您的情况下,有一个队列似乎是合理的,主线程在其中放置要写入的内容,而 db 线程获取并写入它们。队列可以有一个最大深度,这样当有太多东西挂起时,主线程会等待。你也可以做一些不同的事情,比如放弃发生得太快的事情。或者,使用带同步的数据库并写入本地副本。您也可能有机会通过一次提交多个来加快写入速度。

    这是一个工作线程的草图

    import threading
    import queue
    
    class SqlWriterThread(threading.Thread):
    
        def __init__(self, db_connect_info, maxsize=8):
            super().__init__()
            self.db_connect_info = db_connect_info
            self.q = queue.Queue(maxsize)
            # TODO: Can expose q.put directly if you don't need to
            # intercept the call
            # self.put = q.put
            self.start()
    
        def put(self, statement):
            print(f"DEBUG: Putting\n{statement}")
            self.q.put(statement)
    
        def run(self):
            db_conn = None
            while True:
                # get all the statements you can, waiting on first
                statements = [self.q.get()]
                try:
                    while True:
                        statements.append(self.q.get(), block=False)
                except queue.Empty:
                    pass
                try:
                    # early exit before connecting if channel is closed.
                    if statements[0] is None:
                        return
                    if not db_conn:
                        db_conn = do_my_sql_connect()
                    try:
                        print("Debug: Executing\n", "--------\n".join(f"{id(s)} {s}" for s in statements))
                        # todo: need to detect closed connection, then reconnect and resart loop
                        cursor = db_conn.cursor()
                        for statement in statements:
                            if statement is None:
                                return
                            cursor.execute(*statement)
                    finally:
                        cursor.commit()       
                finally:
                    for _ in statements:
                        self.q.task_done()
    
    sql_writer = SqlWriterThread(('user', 'host', 'credentials'))
    sql_writer.put(('execute some stuff',))
    

    【讨论】:

    • 所以我尝试在我的代码中实现这一点,但我遇到了一个奇怪的问题,我对您的代码有疑问。在上面的示例中,您有一个 sql_q 变量,但您从未分配或引用它。这是从哪里来的?或者它应该是 self.q 的错误?我尝试使用 self.q 实现代码,但似乎每次我调用 sql_writer.put 并向 que 添加新语句时,所有语句最终都是彼此的副本,即使我肯定输入了不同的语句。
    • 我刚刚做了一些测试,发现这是由于我的程序在 2 秒内添加到队列中造成的。如果我 time.sleep(2) 队列项目保持原样,但如果我 .put 在 2 秒的间隔内,队列中的所有项目都成为彼此的副本。我该如何解决这个问题,因为等待 2 秒就失去了拥有这个队列系统的全部意义。
    • 是的,应该是self.q。我最初将其编写为函数并将其更改为一个类 - 并在此过程中注入了错误。就像一个二合一的报价。看第二期。
    • 我找不到重复放置的原因,所以添加了一些调试。
    • 很好,我解决了我的问题,所以我多次感谢你提供的好例子。我以前从未听说过python queue,但它看起来很天才,我将来肯定会经常使用它。周末愉快!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-01-13
    • 1970-01-01
    • 2023-03-08
    • 1970-01-01
    相关资源
    最近更新 更多