【问题标题】:Optimization of python multithreading script - huge memory consumptionpython多线程脚本的优化——巨大的内存消耗
【发布时间】:2012-12-11 08:52:44
【问题描述】:

我有一个包含 800 多行代码的脚本(Django Management-Command)。 这应该从外部 Web 服务导入数据,操作某事。并将其写入 Postgres 数据库。

我使用多线程,因为从 web 服务获取数据不是很快。

有一个线程用于使用批量命令获取数据以获取大量 64 个数据集并将每个数据集写入队列。

一开始同时有一个工作线程处理数据并将其写入数据库。 在主(句柄)类中,有一个 while 循环,它每 5 秒查看一次队列中元素的数量和正在运行的工作线程的数量。 如果队列中有超过 500 个元素且工作线程少于 5 个,则会启动一个新的工作线程。

所有工作线程从队列中获取一项,操作某事,将数据集写入数据库并将一个字符串(最多 14 个字符)附加到不同的队列(#2)。

队列 #2 必须在导入结束时将所有导入的对象标记为新对象,并分别从数据库中删除当前未导入的所有其他项目。

对于数量不超过 200.000 个数据集的数据库,一切正常。 但是,如果有一个包含 1.000.000 个数据集的 DB,则在处理 Hole 脚本期间内存消耗会增加到 8 GB 的 RAM。

有没有办法观察线程和/或队列的内存消耗? 有没有一种方法可以在每个 while 循环之后“清理”内存?

# -*- coding: utf-8 -*-

import os
import threading
import Queue
import time

from optparse import OptionParser, make_option
from decimal import Decimal
from datetime import datetime

from django.core.management import call_command
from django.core.management.base import BaseCommand
from django.conf import settings


def is_someone_alive(thread_list):
    so_alive = False
    for t in thread_list:
        if t.is_alive():
            so_alive = True
    return so_alive


class insert_item(threading.Thread):
    VarLock2 = threading.Lock()

    def __init__(self, queue1, item_still_exist2, name, *args, **options):
        threading.Thread.__init__(self)
        self.options = options
        self.name = name
        self.queue1 = queue1
        self.item_still_exist2 = item_still_exist2

    def run(self):

        while not self.queue1.empty() or getItemBulkThread.isrunning:

            item = self.queue1.get()
            artikelobj, created = Artikel.objects.get_or_create(artikelnr=item['Nr'])

            """
            manipulate data
            """

            self.item_still_exist2.put(artikelobj.artikelnr)

            artikelobj.save()

            self.queue1.task_done()


class getItemBulkThread(threading.Thread):
    isrunning = True
    VarLock = threading.Lock()

    def __init__(self, queue1, name, *args, **options):
        threading.Thread.__init__(self)
        self.options = options
        if self.options['nrStart'] != '':
            self.nrab = self.options['nrStart']
        else:
            self.nrab = ''
        self.name = name
        #self.nrab = '701307'
        self.queue1 = queue1
        self.anz_artikel = 64
        self.max_artikel = 64
        self.skipped = 0
        self.max_skip = 20

    def run(self):

        count_sleep = 0
        while True:

            while self.queue1.qsize() > 5000:
                time.sleep(5)
                count_sleep += 1

            if count_sleep > 0:
                print "~ Artikel-Import %(csleep)sx für 5s pausiert, da Queue-Size > 5000" % {'csleep': count_sleep}
                count_sleep = 0

            try:
                items = getItemBulk()  # from external service

            except Exception as exc1:
                if ('"normal" abort-condition' in str(exc1)):
                    getItemBulkThread.VarLock.acquire()
                    getItemBulkThread.isrunning = False
                    getItemBulkThread.VarLock.release()
                    break
                elif self.anz_artikel > 1:
                    self.anz_artikel /= 2
                    continue
                elif self.skipped <= self.max_skip:
                    self.nrab += 1
                    self.skipped += 1
                    time.sleep(5)
                    continue
                elif self.skipped > self.max_skip:
                    raise Exception("[EXCEPTION] Fehler im Thread: too much items skipped")
                else:
                    getItemBulkThread.VarLock.acquire()
                    getItemBulkThread.isrunning = False
                    getItemBulkThread.VarLock.release()
                    raise

            last_item = len(items) - 1
            self.nrab = items[last_item]['Nr']

            for artikel in items:
                artikel['katItem'] = False
                self.queue1.put(artikel)

            if self.anz_artikel < self.max_artikel:
                self.anz_artikel *= 2
                self.skipped = 0


class Command(BaseCommand):
    """
      Django-mgm-command
    """
    help = u'Import'

    def create_parser(self, prog_name, subcommand):
        """
        Create and return the ``OptionParser`` which will be used to
        parse the arguments to this command.
        """
        return OptionParser(prog=prog_name, usage=self.usage(subcommand),
            version=self.get_version(),
            option_list=self.option_list,
            conflict_handler="resolve")

    def handle(self, *args, **options):

        startzeit = datetime.now()
        anzahl_Artikel_vorher = Artikel.objects.all().count()  # Artikel is a model

        self.options = options

        items_vorher = []

        queue1 = Queue.Queue()
        item_still_exists2 = Queue.Queue()

        running_threads = []

        thread = getItemBulkThread(queue1, name="Artikel", *args, **options)
        running_threads.append(thread)
        thread.daemon = True
        thread.start()

        anz_worker_threads = 1
        anz_max_worker_threads = 5

        insert_threads = [insert_item(queue1, item_still_exists2, name="Worker-%(anz)s" % {'anz': i + 1}, *args, **options) for i in range(anz_worker_threads)]
        for thread in insert_threads:
            running_threads.append(thread)
            thread.setDaemon(True)
            thread.start()

        add_seconds = 5
        element_grenze = 500
        lastelemente = 0
        asc_elemente = 0
        anz_abgearbeitet = 0

        while getItemBulkThread.isrunning or not queue1.empty():
            time.sleep(add_seconds)
            elemente = queue1.qsize()
            akt_zeit = datetime.now()
            diff_zeit = akt_zeit - startzeit
            diff = elemente - lastelemente
            anz_abgearbeitet = item_still_exists2.qsize()
            art_speed = (anz_abgearbeitet / timedelta_total_seconds(diff_zeit)) * 60
            ersetz_var = {'anz': elemente, 'zeit': diff_zeit, 'tstamp': akt_zeit.strftime('%Y.%m.%d-%H:%M:%S'), 'anzw': anz_worker_threads, 'diff': diff, 'anza': anz_abgearbeitet, 'art_speed': art_speed}
            print("%(zeit)s vergangen - %(tstamp)s - %(anz)s Elemente in Queue, Veränderung: %(diff)s - Anz Worker: %(anzw)s - Artikel importiert: %(anza)s - Speed: %(art_speed)02d Art/Min" % ersetz_var)

            if diff > 0:
                asc_elemente += 1
            else:
                asc_elemente = 0
            if asc_elemente > 2 and anz_worker_threads < anz_max_worker_threads and elemente > element_grenze:
                ersetz_var = {'maxw': anz_max_worker_threads, 'nr': anz_worker_threads + 1, 'element_grenze': element_grenze}
                print "~~ 2x in Folge mehr Queue-Elemente als vorher, die max. Anzahl an Workern %(maxw)s noch nicht erreicht und mehr als %(element_grenze)s Elemente in der Queue, daher Start eines neuen Workers (Nr %(nr)s)" % ersetz_var
                anz_worker_threads += 1
                thread = insert_item(queue1, item_still_exists2, name="Worker-%(anz)s" % {'anz': anz_worker_threads}, *args, **options)
                running_threads.append(thread)
                thread.setDaemon(True)
                thread.start()
                asc_elemente = 0
            lastelemente = elemente

        queue1.join()

        items_nachher = []
        while not item_still_exists2.empty():
            item = item_still_exists2.get()
            if item in items_vorher:
                items_nachher.append(item)
                items_vorher.remove(item)
            item_still_exists2.task_done()

        item_still_exists2.join()

        if len(items_vorher) > 0:
            Artikel.objects.filter(artikelnr__in=items_vorher).delete()

        anzahl_Artikel_nachher = Artikel.objects.all().count()
        anzahl_Artikel_diff = anzahl_Artikel_nachher - anzahl_Artikel_vorher

        endzeit = datetime.now()
        dauer = endzeit - startzeit

我已经在某些位置缩写了代码:)

【问题讨论】:

  • 我没有在 Python 中使用多线程的经验,但我读了很多,认为你不应该这样做,因为 Python 在这方面很烂。更好的方法应该是使用像 zeromq 这样的多处理框架,并将你的任务从你的 python 进程传送到你的多处理工具。
  • 用于“从 web 服务获取数据”ioloop 方法,即使用“绿色线程”(geventeventlet 或类似的东西)。
  • Artikel.objects 是否可能将您的对象保存在内存中?
  • 我看到一些地方被添加到 running_threads 列表中,但似乎没有任何东西被删除...
  • @erikb85 实际上这一切都取决于具体情况。 Python 使用真正的操作系统线程并且对调度等没有责任。在某些情况下这是好的,而在其他情况下则不是。 Python 有一个内置的多处理模块,允许在进程之间创建共享对象和/或队列/管道,因此您不必使用外部库。

标签: python multithreading memory


【解决方案1】:

内存消耗过多的一个可能原因是您没有为输入队列设置最大大小。见the maxsize parameter

在相关说明中,您写道:

在主(句柄)类中,有一个 while 循环,每 5 次查找一次 队列中元素的数量和数量的秒数 运行工作线程。如果有超过 500 个元素 队列并且有少于 5 个工作线程,它开始一个新的 工作线程。

创建一个新线程并不一定会增加吞吐量。您应该进行一些测试以确定最佳线程数,结果可能是 1。

【讨论】:

  • 在启动新线程之前有一个简单的逻辑(参见帖子中的代码)。感谢maxsize的提示,我会试试的。我们测试了有意义的工作线程的最大数量。
猜你喜欢
  • 2016-04-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-07-03
  • 2020-11-18
  • 2014-04-15
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多