【问题标题】:Synchronization when reading a file using multiprocessing in Python在 Python 中使用多处理读取文件时的同步
【发布时间】:2019-08-09 23:47:16
【问题描述】:

我有一个 python 函数,它从一个大文件中读取随机的 sn-ps 并对其进行一些处理。我希望处理发生在多个进程中,因此使用多处理。我在父进程中打开文件(以二进制模式)并将文件描述符传递给每个子进程,然后使用 multiprocessing.Lock() 来同步对文件的访问。使用单个工作人员时,事情会按预期工作,但是如果有更多工作人员,即使有锁,文件读取也会随机返回错误数据(通常来自文件的一部分,来自文件的另一部分)。此外,文件中的位置(由 file.tell() 返回)经常会被弄乱。这一切都表明了访问描述符的基本竞争条件,但我的理解是 multiprocessing.Lock() 应该防止并发访问它。 file.seek() 和/或 file.read() 是否有某种不包含在锁定/解锁屏障中的异步操作?这是怎么回事?

一个简单的解决方法是让每个进程单独打开文件并获取自己的文件描述符(我已经确认这确实有效),但我想了解我缺少什么。以文本模式打开文件也可以防止问题发生,但不适用于我的用例,也不能解释二进制情况下发生的情况。

我已经在许多 Linux 系统和 OS X 以及各种本地和远程文件系统上运行了以下复制器。我总是得到很多错误的文件位置和至少几个校验和错误。我知道读取并不能保证读取请求的全部数据量,但我已经确认这不是这里发生的情况,并省略了该代码以保持简洁。

import argparse
import multiprocessing
import random
import string

def worker(worker, args):
    rng = random.Random(1234 + worker)
    for i in range(args.count):
        block = rng.randrange(args.blockcount)
        start = block * args.blocksize
        with args.lock:
            args.fd.seek(start)
            data = args.fd.read(args.blocksize)
            pos = args.fd.tell()
            if pos != start + args.blocksize:
                print(i, "bad file position", start, start + args.blocksize, pos)
            cksm = sum(data)
            if cksm != args.cksms[block]:
                print(i, "bad checksum", cksm, args.cksms[block])

args = argparse.Namespace()
args.file = '/tmp/text'
args.count = 1000
args.blocksize = 1000
args.blockcount = args.count
args.filesize = args.blocksize * args.blockcount
args.num_workers = 4

args.cksms = multiprocessing.Array('i', [0] * args.blockcount)
with open(args.file, 'w') as f:
    for i in range(args.blockcount):
        data = ''.join(random.choice(string.ascii_lowercase) for x in range(args.blocksize))
        args.cksms[i] = sum(data.encode())
        f.write(data)
args.fd = open(args.file, 'rb')  
args.lock = multiprocessing.Lock()

procs = []
for i in range(args.num_workers):
    p = multiprocessing.Process(target=worker, args=(i, args))
    procs.append(p)
    p.start()

示例输出:

$ python test.py
158 bad file position 969000 970000 741000
223 bad file position 908000 909000 13000
232 bad file position 679000 680000 960000
263 bad file position 959000 960000 205000
390 bad file position 771000 772000 36000
410 bad file position 148000 149000 42000
441 bad file position 677000 678000 21000
459 bad file position 143000 144000 636000
505 bad file position 579000 580000 731000
505 bad checksum 109372 109889
532 bad file position 962000 963000 243000
494 bad file position 418000 419000 2000
569 bad file position 266000 267000 991000
752 bad file position 732000 733000 264000
840 bad file position 801000 802000 933000
799 bad file position 332000 333000 989000
866 bad file position 150000 151000 248000
866 bad checksum 109116 109375
887 bad file position 39000 40000 974000
937 bad file position 18000 19000 938000
969 bad file position 20000 21000 24000
953 bad file position 542000 543000 767000
977 bad file position 694000 695000 782000

【问题讨论】:

    标签: python file-io synchronization multiprocessing python-multiprocessing


    【解决方案1】:

    这似乎是由缓冲引起的:使用 open(args.file, 'rb', buffering=0) 我无法再重现。

    https://docs.python.org/3/library/functions.html#open

    buffering 是一个可选整数,用于设置缓冲策略。传递 0 以关闭缓冲 [...] 当没有给出缓冲参数时,默认缓冲策略的工作方式如下: [...] 二进制文件以固定大小的块进行缓冲;缓冲区的大小 [...] 通常为 4096 或 8192 字节长。 [...]

    【讨论】:

    • 有意思,谢谢!即使打开了缓冲,原始代码似乎仍然可以工作(尽管缓冲无济于事),但这无疑提供了确凿的证据。
    【解决方案2】:

    我已经检查过了,只使用了 multiprocessing.Lock(没有缓冲 = 0),仍然遇到了bad data。使用multiprocessing.Lockbuffering=0,一切顺利

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-04-14
      • 2011-08-14
      • 2016-03-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多