【问题标题】:Python: Pre-loading memoryPython:预加载内存
【发布时间】:2021-08-25 13:27:43
【问题描述】:

我有一个 python 程序,我需要在其中加载和反序列化一个 1GB 的 pickle 文件。这需要 20 秒的时间,我想要一种机制,让泡菜的内容可以随时使用。我看过shared_memory,但所有使用它的例子似乎都涉及numpy,我的项目没有使用numpy。使用shared_memory 或其他方式实现此目的的最简单和最干净的方法是什么?

这就是我现在加载数据的方式(每次运行时):

def load_pickle(pickle_name):
    return pickle.load(open(DATA_ROOT + pickle_name, 'rb'))

我希望能够在两次运行之间编辑模拟代码,而无需重新加载 pickle。我一直在搞乱importlib.reload,但对于一个包含许多文件的大型 Python 程序来说,它似乎真的不能很好地工作:

def main():
    data_manager.load_data()
    run_simulation()
    while True:
        try:
            importlib.reload(simulation)
            run_simulation()
        except:
        print(traceback.format_exc())
        print('Press enter to re-run main.py, CTRL-C to exit')
        sys.stdin.readline()

【问题讨论】:

  • 数据是什么?您需要一次性加载所有内容吗?
  • 看来shared_memory 将信息存储为字节缓冲区。如果您不尝试共享数组,那么您可能必须再次重新序列化数据才能保存在那里。
  • 我不明白您要解决什么问题。如果数据需要“随时可用”,那么为什么首先要对其进行腌制-而不是仅仅保留对象?为什么要重新启动程序,尤其是在需要避免加载时间的情况下?
  • 有什么阻止您拥有主程序并将模拟重新格式化为要导入的类吗?然后让主程序在加载数据的情况下一直运行(并在启动时启动),并且在您想要模拟的任何时候,*重新导入新的模拟类(如果可能),复制数据并传入。跨度>
  • 您说您的代码不使用numpy,但它使用的是什么?您需要在运行之间保存的这种海量数据结构是什么?您将无法将整个 Python 对象保存到某种共享内存空间中,如果您尝试过,您会严重破坏解释器的内存管理。但是,根据您的数据实际情况,您也许可以分享一些东西,但我们无法知道它会是什么,而无需对数据有所了解。

标签: python shared-memory


【解决方案1】:

这可能是XY problem,其来源是假设您必须使用泡菜;由于它们管理依赖项的方式,它们处理起来很糟糕,因此对于任何长期数据存储来说基本上都是一个糟糕的选择

源财务数据几乎可以肯定是以某种表格形式开始的,因此可以以更友好的格式请求它

A simple middleware 同时反序列化和重新序列化泡菜将平滑过渡

input -> load pickle -> write -> output

将您的工作流程转换为使用设计为efficient to read and write 的 Parquet 或 Feather 几乎肯定会对您的加载速度产生相当大的影响

更多相关链接


您也可以使用hickle 来实现这一点,它将在内部使用 HDH5 格式,理想情况下使其比 pickle 快得多,同时仍然表现得像一个

【讨论】:

  • 我不知道为什么,但是 hickle 并不是 pickle 的替代品——我不得不重写代码——然后它超级慢
  • 绝对不是临时的,但这样的解决方案可以缓和政治,因为它很容易比较
【解决方案2】:

据我了解:

  • 需要加载一些东西
  • 它需要经常加载,因为包含使用它的代码的文件经常被编辑
  • 你不想等到它每次都被加载

也许这样的解决方案对你来说没问题。

您可以用这种方式编写脚本加载器文件(在 Python 3.8 上测试):

import importlib.util, traceback, sys, gc

# Example data
import pickle
something = pickle.loads(pickle.dumps([123]))

if __name__ == '__main__':
    try:
        mod_path = sys.argv[1]
    except IndexError:
        print('Usage: python3', sys.argv[0], 'PATH_TO_SCRIPT')
        exit(1)

    modules_before = list(sys.modules.keys())
    argv = sys.argv[1:]
    while True:
        MOD_NAME = '__main__'
        spec = importlib.util.spec_from_file_location(MOD_NAME, mod_path)
        mod = importlib.util.module_from_spec(spec)

        # Change to needed global name in the target module
        mod.something = something
        
        sys.modules[MOD_NAME] = mod
        sys.argv = argv
        try:
            spec.loader.exec_module(mod)
        except:
            traceback.print_exc()
        del mod, spec
        modules_after = list(sys.modules.keys())
        for k in modules_after:
            if k not in modules_before:
                del sys.modules[k]
        gc.collect()
        print('Press enter to re-run, CTRL-C to exit')
        sys.stdin.readline()

模块示例:

# Change 1 to some different number when first script is running and press enter
something[0] += 1 
print(something)

应该可以。并且应该将pickle的重新加载时间减少到接近零?

UPD 添加接受带有命令行参数的脚本名称的可能性

【讨论】:

  • 这个想法很棒,但在实践中似乎行不通。除非我退出并重新启动,否则我对程序中任何文件所做的任何更改都不会得到反映。
  • @etayluz 奇怪。您可以制作一些代码示例或其他任何内容来显示它不工作的方式吗?我不确定我是否理解,在这种情况下这个脚本应该如何失败,因为它应该从字面上卸载所有加载的模块。或者它以某种方式崩溃?或者它以某种方式重新运行相同的代码?东西。
  • @etayluz 添加了gc.collect() 以防万一。不确定它是否会改变一些东西。我不知道如何以任何方式修复我从未见过的东西:D
  • + 在脚本执行时增加了 ctrl+c 的可能性。因此,需要一个双 ctrl+c 来停止执行。
  • 您的代码似乎非常适合一个文件:mod_name, mod_path = 'some_file', 'some_file.py' - 但我的程序有大约 50 个文件。如何重新加载每个文件?
【解决方案3】:

您可以利用多处理在子进程内运行模拟,并利用 copy-on-write benefits of forking 在开始时仅取消/处理一次数据:

import multiprocessing
import pickle


# Need to use forking to get copy-on-write benefits!
mp = multiprocessing.get_context('fork')


# Load data once, in the parent process
data = pickle.load(open(DATA_ROOT + pickle_name, 'rb'))


def _run_simulation(_):
    # Wrapper for `run_simulation` that takes one argument. The function passed
    # into `multiprocessing.Pool.map` must take one argument.
    run_simulation()


with mp.Pool() as pool:
    pool.map(_run_simulation, range(num_simulations))

如果你想参数化每个模拟运行,你可以这样做:

import multiprocessing
import pickle


# Need to use forking to get copy-on-write benefits!
mp = multiprocessing.get_context('fork')


# Load data once, in the parent process
data = pickle.load(open(DATA_ROOT + pickle_name, 'rb'))


with mp.Pool() as pool:
    simulations = ('arg for simulation run', 'arg for another simulation run')
    pool.map(run_simulation, simulations)

这样run_simulation 函数将从simulations 元组中传递值,这可以允许每个模拟运行使用不同的参数,或者甚至只是为每个运行分配一个名称的ID 号用于记录/保存目的。

整个方法依赖于可用的分叉。有关将 fork 与 Python 的内置多处理库一起使用的更多信息,请参阅the docs about contexts and start methods。由于文档中描述的原因,您可能还需要考虑使用forkserver 多处理上下文(通过使用mp = multiprocessing.get_context('fork'))。


如果您不想并行运行模拟,则可以采用这种方法。关键是,为了只处理一次数据,您必须在处理数据的进程或其子进程之一中调用run_simulation

例如,如果您想编辑 run_simulation 所做的事情,然后在您的命令下再次运行它,您可以使用类似以下的代码来完成:

main.py:

import multiprocessing
from multiprocessing.connection import Connection
import pickle

from data import load_data


# Load/process data in the parent process
load_data()
# Now child processes can access the data nearly instantaneously


# Need to use forking to get copy-on-write benefits!
mp = multiprocessing.get_context('fork') # Consider using 'forkserver' instead


# This is only ever run in child processes
def load_and_run_simulation(result_pipe: Connection) -> None:
    # Import `run_simulation` here to allow it to change between runs
    from simulation import run_simulation
    # Ensure that simulation has not been imported in the parent process, as if
    # so, it will be available in the child process just like the data!
    try:
        run_simulation()
    except Exception as ex:
        # Send the exception to the parent process
        result_pipe.send(ex)
    else:
        # Send this because the parent is waiting for a response
        result_pipe.send(None)


def run_simulation_in_child_process() -> None:
    result_pipe_output, result_pipe_input = mp.Pipe(duplex=False)
    proc = mp.Process(
        target=load_and_run_simulation,
        args=(result_pipe_input,)
    )
    print('Starting simulation')
    proc.start()
    try:
        # The `recv` below will wait until the child process sends sometime, or
        # will raise `EOFError` if the child process crashes suddenly without
        # sending an exception (e.g. if a segfault occurs)
        result = result_pipe_output.recv()
        if isinstance(result, Exception):
            raise result # raise exceptions from the child process
        proc.join()
    except KeyboardInterrupt:
        print("Caught 'KeyboardInterrupt'; terminating simulation")
        proc.terminate()
    print('Simulation finished')


if __name__ == '__main__':
    while True:
        choice = input('\n'.join((
            'What would you like to do?',
            '1) Run simulation',
            '2) Exit\n',
        )))
        if choice.strip() == '1':
            run_simulation_in_child_process()
        elif choice.strip() == '2':
            exit()
        else:
            print(f'Invalid option: {choice!r}')

data.py:

from functools import lru_cache

# <obtain 'DATA_ROOT' and 'pickle_name' here>


@lru_cache
def load_data():
    with open(DATA_ROOT + pickle_name, 'rb') as f:
        return pickle.load(f)

simulation.py:

from data import load_data


# This call will complete almost instantaneously if `main.py` has been run
data = load_data()


def run_simulation():
    # Run the simulation using the data, which will already be loaded if this
    # is run from `main.py`.
    # Anything printed here will appear in the output of the parent process.
    # Exceptions raised here will be caught/handled by the parent process.
    ...

上面详述的三个文件都应该在同一个目录中,旁边还有一个可以为空的__init__.py 文件。 main.py 文件可以重命名为您想要的任何名称,并且是该程序的主要入口点。您可以直接运行simulation.py,但这会导致加载/处理数据花费很长时间,这是您最初遇到的问题。在运行main.py 时,可以编辑文件simulation.py,因为每次从main.py 运行模拟时都会重新加载它。

对于 macOS 用户:在 macOS 上分叉可能有点错误,这就是为什么 Python 默认使用 spawn 方法在 macOS 上进行多处理,但仍然支持 forkforkserver。如果您遇到崩溃或与多处理相关的问题,请尝试将 OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES 添加到您的环境中。详情请见https://stackoverflow.com/a/52230415/5946921

【讨论】:

  • @etayluz 我已经编辑了我的答案,以添加一种我认为更符合您的用例的方法。如果您对此有任何疑问,或者有什么我可以提供的帮助,请告诉我。
  • 谢谢!不幸的是,我认为它不会起作用,因为我需要在每次使用这种方法编辑文件后重新启动。如果我必须重新启动,我必须重新加载数据。
  • @etayluz 不,你没有。请参阅我的答案底部的方法。每次都重新导入包含run_simulation 的文件。您可以编辑该文件,然后在提示符处输入“1”以重新运行它。如果之前的运行还在运行,可以输入“ctrl+c”停止,然后在提示符处选择“1”。
  • 谢谢!请看我的问题 - 我已经尝试过这种技术,它对于一个有很多文件的程序来说很奇怪。一些模块会重新加载,但其他模块不会。根据我的经验,这不是一种可靠或可扩展的技术。在这一点上,我更倾向于 Producer->Consumer 共享内存范例。
  • 我明白你在说什么了!感谢您澄清这一点。让我明天试试这个(这里已经很晚了)——然后回复你。谢谢!
【解决方案4】:

这是我在写这个答案时的假设:

  1. 您的财务数据是在复杂操作后生成的,您希望结果保留在内存中
  2. 使用的代码必须能够快速访问该数据
  3. 您希望使用共享内存

这里是代码(不言自明,我相信

数据结构

'''
Nested class definitions to simulate complex data
'''

class A:
    def __init__(self, name, value):
        self.name = name
        self.value = value

    def get_attr(self):
        return self.name, self.value

    def set_attr(self, n, v):
        self.name = n
        self.value = v


class B(A):
    def __init__(self, name, value, status):
        super(B, self).__init__(name, value)
        self.status = status

    def set_attr(self, n, v, s):
        A.set_attr(self, n,v)
        self.status = s

    def get_attr(self):
        print('\nName : {}\nValue : {}\nStatus : {}'.format(self.name, self.value, self.status))

生产者.py

from multiprocessing import shared_memory as sm
import time
import pickle as pkl
import pickletools as ptool
import sys
from class_defs import B


def main():

    # Data Creation/Processing
    obj1 = B('Sam Reagon', '2703', 'Active')
    #print(sys.getsizeof(obj1))
    obj1.set_attr('Ronald Reagon', '1023', 'INACTIVE')
    obj1.get_attr()

    ###### real deal #########

    # Create pickle string
    byte_str = pkl.dumps(obj=obj1, protocol=pkl.HIGHEST_PROTOCOL, buffer_callback=None)
    
    # compress the pickle
    #byte_str_opt = ptool.optimize(byte_str)
    byte_str_opt = bytearray(byte_str)
    
    # place data on shared memory buffer
    shm_a = sm.SharedMemory(name='datashare', create=True, size=len(byte_str_opt))#sys.getsizeof(obj1))
    buffer = shm_a.buf
    buffer[:] = byte_str_opt[:]

    #print(shm_a.name)               # the string to access the shared memory
    #print(len(shm_a.buf[:]))

    # Just an infinite loop to keep the producer running, like a server
    #   a better approach would be to explore use of shared memory manager
    while(True):
        time.sleep(60)


if __name__ == '__main__':
    main()

Consumer.py

from multiprocessing import shared_memory as sm
import pickle as pkl
from class_defs import B    # we need this so that while unpickling, the object structure is understood


def main():
    shm_b = sm.SharedMemory(name='datashare')
    byte_str = bytes(shm_b.buf[:])              # convert the shared_memory buffer to a bytes array

    obj = pkl.loads(data=byte_str)              # un-pickle the bytes array (as a data source)

    print(obj.name, obj.value, obj.status)      # get the values of the object attributes


if __name__ == '__main__':
    main()

Producer.py 在一个终端中执行时,它将为共享内存发出一个字符串标识符(例如,wnsm_86cd09d4)。在 Consumer.py 中输入此字符串,然后在另一个终端中执行。

只需在一个终端上运行 Producer.py 并在 同一台 机器上的另一个终端上运行 Consumer.py

我希望这是你想要的!

【讨论】:

  • 这是在 Windows 10 x64 环境下的 Python 3.8(通过 anaconda 4.8.4)上测试的
  • Traceback(最近一次调用最后一次):文件“/Users/etayluz/stocks/src/data_loader.py”,第 18 行,在 byte_str_opt = ptool.optimize(byte_str) 文件“/ Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/pickletools.py”,第 2337 行,在 _genops(p,yield_end_pos=True) 中优化操作码、arg、pos、end_pos:文件“/Library /Frameworks/Python.framework/Versions/3.9/lib/python3.9/pickletools.py”,第 2279 行,在 _genops 代码 = data.read(1) AttributeError: 'NoneType' 对象没有属性 'read'
  • 你知道上面的错误是关于什么的吗? ptool
  • 尝试删除该语句。另外,尝试打印 pkl.dumps 语句的输出长度 - 我猜它是空的( from AttributeError: 'NoneType' object ...
  • 是的 - 这是我的错误,我深表歉意。
【解决方案5】:

这不是问题的确切答案,因为 Q 看起来像 pickle 和 SHM 是必需的,但其他人走了这条路,所以我将分享我的一个技巧。它可能会帮助你。无论如何,这里有一些很好的解决方案使用泡菜和 SHM。关于这一点,我只能提供更多相同的内容。同样的意大利面,稍加调味汁。

我在处理您的情况时采用的两个技巧如下。

首先是使用 sqlite3 代替 pickle。您甚至可以使用 sqlite 轻松开发用于替换的模块。不错的是,数据将使用本机 Python 类型插入和选择,您可以使用转换器和适配器函数定义自己的数据,这些函数将使用您选择的序列化方法来存储复杂对象。可以是 pickle 或 json 之类的。

我所做的是定义一个类,其数据通过构造函数的 *args 和/或 **kwargs 传入。它代表我需要的任何 obj 模型,然后我从“select * from table;”中选择行我的数据库并让 Python 在新对象初始化期间解包数据。使用数据类型转换加载大量数据,即使是自定义的也快得惊人。 sqlite 将为您管理缓冲和 IO 内容,并且比 pickle 更快。诀窍是尽可能快地构建要填充和启动的对象。我要么继承 dict() 要么使用插槽来加快速度。 sqlite3 是 Python 自带的,所以这也是一个好处。

我的另一种方法是使用 ZIP 文件和结构模块。 您构建一个包含多个文件的 ZIP 文件。例如。对于超过 400000 个单词的发音词典,我想要一个 dict() 对象。所以我使用一个文件,比方说,lengths.dat,我在其中以二进制格式为每一对定义了一个键的长度和一个值的长度。然后我有一个单词文件和一个发音文件一个接一个。 当我从文件加载时,我读取长度并使用它们从其他两个文件中构造单词的 dict() 及其发音。索引 bytes() 很快,因此创建这样的字典非常快。如果需要考虑磁盘空间,您甚至可以将其压缩,但会导致一些速度损失。

这两种方法在磁盘上占用的空间都比 pickle 少。 第二种方法将要求您将所需的所有数据读入 RAM,然后您将构建对象,这将占用几乎两倍于数据占用的 RAM,然后您当然可以丢弃原始数据。但总的来说,所需要的不应该超过泡菜所需要的。至于 RAM,如果需要,操作系统将使用虚拟内存/SWAP 管理几乎所有内容。

哦,是的,我使用了第三个技巧。当我按上述方式构建 ZIP 文件或在构建对象时需要额外反序列化的任何其他内容,并且此类对象的数量很大时,我会引入延迟加载。 IE。假设我们有一个包含序列化对象的大文件。您让程序加载所有数据并将其分配给您保存在 list() 或 dict() 中的每个对象。 您编写类的方式是,当第一次要求对象提供数据时,它会解压缩其原始数据,反序列化等等,从 RAM 中删除原始数据,然后返回您的结果。因此,在您真正需要相关数据之前,您不会浪费加载时间,对于用户而言,这比启动一个流程需要 20 秒的时间要少得多。

【讨论】:

  • 无意冒犯,但是,我认为 OP 会更喜欢代码而不是 prose
【解决方案6】:

添加另一个具有挑战性的假设答案,它可能是 您正在从中读取文件的位置,这会产生很大的不同

1G 在当今系统中并不是大量数据;加载 20 秒时,速度仅为 50MB/s,即使是最慢的磁盘也只能提供一小部分

您可能会发现您实际上有一个慢速磁盘或某种类型的网络共享作为您真正的瓶颈,并且更改为更快的存储介质或压缩数据(可能使用 gzip)对读写有很大的影响

【讨论】:

  • 感谢您的评论。我在 2018 MacBook Pro 上本地运行。这里没有这样的问题。
【解决方案7】:

您可以使用可共享列表: 因此,您将运行 1 个 python 程序,它将加载文件并将其保存在内存中,另一个 python 程序可以从内存中获取文件。您的数据,无论它是什么,您都可以将其加载到字典中,然后将其转储为 json,然后重新加载 json。 所以

程序1

import pickle
import json
from multiprocessing.managers import SharedMemoryManager
YOUR_DATA=pickle.load(open(DATA_ROOT + pickle_name, 'rb'))
data_dict={'DATA':YOUR_DATA}
data_dict_json=json.dumps(data_dict)
smm = SharedMemoryManager()
smm.start() 
sl = smm.ShareableList(['alpha','beta',data_dict_json])
print (sl)
#smm.shutdown() commenting shutdown now but you will need to do it eventually

输出将如下所示

#OUTPUT
>>>ShareableList(['alpha', 'beta', "your data in json format"], name='psm_12abcd')

现在在 Program2 中:

from multiprocessing import shared_memory
load_from_mem=shared_memory.ShareableList(name='psm_12abcd')
load_from_mem[1]
#OUTPUT
'beta'
load_from_mem[2]
#OUTPUT
yourdataindictionaryformat


你可以在这里寻找更多 https://docs.python.org/3/library/multiprocessing.shared_memory.html

【讨论】:

  • 你确定这个比例吗?我希望Manger 代码能够腌制并通过 IPC 发送提问者需要有效使用的相同数据,因此将其预加载到一个程序中可能不会添加任何内容。
  • 它预加载在内存中。当前,提问者每次运行程序时都必须从 DISK 加载数据。使用这种方法,数据将被加载到内存中,并为另一个程序加载该数据提供参考。他需要从内存中获取文件的东西。而这个 sn-p 正在达到这个目的。考虑到他在 os 进程后有足够的内存,它将运行 1GB 的数据
  • File "/Users/etayluz/stocks/src/data_loader.py", line 19, in main sl = smm.ShareableList(['alpha', 'beta', data_dict_json]) File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py", line 1363, in ShareableList sl = shared_memory.ShareableList(sequence) File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/shared_memory.py", line 308, in __init__ assert sum(len(fmt) &lt;= 8 for fmt in _formats) == self._list_len AssertionError
  • @ibadia 知道这个错误是关于什么的吗?
【解决方案8】:

将未腌制数据存储在内存中的另一种方法是将腌制数据存储在 ramdisk 中,只要大部分时间开销来自磁盘读取。示例代码(在终端中运行)如下。

sudo mkdir mnt/pickle
mount -o size=1536M -t tmpfs none /mnt/pickle
cp path/to/pickle.pkl mnt/pickle/pickle.pkl 

然后您可以通过mnt/pickle/pickle.pkl 访问泡菜。请注意,您可以将文件名和扩展名更改为您想要的任何内容。如果磁盘读取不是最大的瓶颈,您可能看不到速度的提高。如果内存不足,可以尝试调小ramdisk的大小(我设置为1536mb,也就是1.5gb)

【讨论】:

  • 请注意,这仅适用于 linux(特别是 ubuntu;我不确定它如何推广到哪里)。如果您使用的是 Windows 或 Mac,则需要遵循不同的流程。
  • 这看起来很有趣——但我的程序也需要在 Windows 上运行。我需要一个跨平台的解决方案
猜你喜欢
  • 2011-06-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-07-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多