【问题标题】:Make Boto3 upload calls blocking (single threaded)使 Boto3 上传调用阻塞(单线程)
【发布时间】:2019-04-22 18:52:48
【问题描述】:

编辑:我最初的假设被证明部分错误。我在这里添加了一个冗长的答案,我邀请其他人对此进行压力测试和更正。


我正在寻找一种以单线程方式利用 Boto3 S3 API 来模拟线程安全键值存储的方法。简而言之,我想使用调用线程而不是新线程来进行上传。

据我所知,Boto3(或.upload_file())中.upload_fileobj() 方法的默认行为是将任务启动到新线程并立即返回None

来自docs

这是一种托管传输,如有必要,它将在多个线程中执行分段上传。

(如果我对此的理解首先是错误的,那么对此进行更正也会有所帮助。这是在 Boto3 1.9.134 中。)

>>> import io
>>> import boto3
>>> bucket = boto3.resource('s3').Bucket('my-bucket-name')
>>> buf = io.BytesIO(b"test")
>>> res = bucket.upload_fileobj(buf, 'testobj')
>>> res is None
True

现在,假设buf 不是一个短的 4 字节字符串,而是一个巨大的文本 blob,需要相当长的时间才能完全上传。

我还使用这个函数来检查是否存在具有给定键的对象:

def key_exists_in_bucket(bucket_obj, key: str) -> bool:
    try:
        bucket_obj.Object(key).load()
    except botocore.exceptions.ClientError:
        return False
    else:
        return True

如果对象按名称存在,我的意图是不重写它。

这里的竞争条件相当明显:启动异步上传,然后快速检查key_exists_in_bucket(),如果对象仍在写入,则返回False,然后不必要地再次写入那个。

有没有办法确保bucket.upload_fileobj()当前线程调用,而不是在该方法范围内创建的新线程?

我意识到这会减慢速度。在这种情况下,我愿意牺牲速度。

【问题讨论】:

  • 我过去曾使用 S3 客户端和资源级 API 上传文件,但它们都阻止了 iirc。你确定这甚至是一个问题?

标签: python boto3


【解决方案1】:

我想,由于这个问题的答案和another similar question 似乎直接冲突,最好直接使用pdb 找到源。

总结

  • boto3 确实默认使用多线程 (10)
  • 但是,它不是异步的,因为它在返回之前等待(加入)这些线程,而不是使用“即发即弃”技术
  • 因此,以这种方式,如果您尝试从多个客户端与 s3 存储桶通信,则读/写线程安全就位。

详情

我在这里努力解决的一个方面是多个(子线程)确实暗示顶级方法本身是非阻塞的:如果调用线程开始上传到多个子线程,但是等待这些线程完成并返回,我敢说这仍然是一个阻塞调用。另一方面,如果方法调用在asyncio 中是一个“即发即弃”的调用。对于threading,这实际上归结为是否曾经调用过x.join()

这是从 Victor Val 获取的用于启动调试器的初始代码:

import io
import pdb

import boto3

# From dd if=/dev/zero of=100mb.txt  bs=50M  count=1
buf = io.BytesIO(open('100mb.txt', 'rb').read())
bucket = boto3.resource('s3').Bucket('test-threads')
pdb.run("bucket.upload_fileobj(buf, '100mb')")

此堆栈帧来自 Boto 1.9.134。

现在跳转到pdb

.upload_fileobj() 首先调用一个嵌套方法 -- 还没有太多可看的。

(Pdb) s
--Call--
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(542)bucket_upload_fileobj()
-> def bucket_upload_fileobj(self, Fileobj, Key, ExtraArgs=None,
(Pdb) s

(Pdb) l
574     
575         :type Config: boto3.s3.transfer.TransferConfig
576         :param Config: The transfer configuration to be used when performing the
577             upload.
578         """
579  ->     return self.meta.client.upload_fileobj(
580             Fileobj=Fileobj, Bucket=self.name, Key=Key, ExtraArgs=ExtraArgs,
581             Callback=Callback, Config=Config)
582     
583     
584  

所以顶级方法确实返回了 something,但目前尚不清楚该东西最终如何变成None

所以我们开始了。

现在,.upload_fileobj() 确实有一个 config 参数,默认为 None:

(Pdb) l 531
526     
527         subscribers = None
528         if Callback is not None:
529             subscribers = [ProgressCallbackInvoker(Callback)]
530     
531         config = Config
532         if config is None:
533             config = TransferConfig()
534     
535         with create_transfer_manager(self, config) as manager:
536             future = manager.upload(

这意味着config 成为默认TransferConfig()

  • use_threads -- 如果为 True,则执行 S3 传输时将使用线程。如果为 False,则不会使用线程来执行传输:所有逻辑都将在主线程中运行。
  • max_concurrency -- 发出请求以执行传输的最大线程数。如果 use_threads 设置为 False,则忽略提供的值,因为传输只会使用主线程。

哇啦,他们来了:

(Pdb) unt 534
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(535)upload_fileobj()
-> with create_transfer_manager(self, config) as manager:
(Pdb) config
<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>
(Pdb) config.use_threads
True
(Pdb) config.max_concurrency
10

现在我们在调用堆栈中下降一个级别以使用TransferManager(上下文管理器)。此时,max_concurrency 已被用作同名 max_request_concurrency 的参数:

# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/manager.py#L223

    # The executor responsible for making S3 API transfer requests
    self._request_executor = BoundedExecutor(
        max_size=self._config.max_request_queue_size,
        max_num_threads=self._config.max_request_concurrency,
        tag_semaphores={
            IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
                self._config.max_in_memory_upload_chunks),
            IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
                self._config.max_in_memory_download_chunks)
        },
        executor_cls=executor_cls
    )

至少在这个 boto3 版本中,该类来自单独的库 s3transfer

(Pdb) n
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(536)upload_fileobj()
-> future = manager.upload(
(Pdb) manager
<s3transfer.manager.TransferManager object at 0x7f178db437f0>
(Pdb) manager._config
<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>
(Pdb) manager._config.use_threads
True
(Pdb) manager._config.max_concurrency
10

接下来,让我们进入manager.upload()。这是该方法的全文:

(Pdb) l 290, 303
290  ->         if extra_args is None:
291                 extra_args = {}
292             if subscribers is None:
293                 subscribers = []
294             self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
295             call_args = CallArgs(
296                 fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,
297                 subscribers=subscribers
298             )
299             extra_main_kwargs = {}
300             if self._bandwidth_limiter:
301                 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
302             return self._submit_transfer(
303                 call_args, UploadSubmissionTask, extra_main_kwargs)

(Pdb) unt 301
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(302)upload()
-> return self._submit_transfer(
(Pdb) extra_main_kwargs
{}

(Pdb) UploadSubmissionTask
<class 's3transfer.upload.UploadSubmissionTask'>
(Pdb) call_args
<s3transfer.utils.CallArgs object at 0x7f178db5a5f8>

(Pdb) l 300, 5
300             if self._bandwidth_limiter:
301                 extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
302  ->         return self._submit_transfer(
303                 call_args, UploadSubmissionTask, extra_main_kwargs)
304     
305         def download(self, bucket, key, fileobj, extra_args=None,

啊,太棒了——所以我们至少需要再往下一层才能看到实际的底层上传。

(Pdb) s
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(303)upload()
-> call_args, UploadSubmissionTask, extra_main_kwargs)
(Pdb) s
--Call--
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(438)_submit_transfer()
-> def _submit_transfer(self, call_args, submission_task_cls,
(Pdb) s
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/s3transfer/manager.py(440)_submit_transfer()
-> if not extra_main_kwargs:

(Pdb) l 440, 10
440  ->         if not extra_main_kwargs:
441                 extra_main_kwargs = {}
442     
443             # Create a TransferFuture to return back to the user
444             transfer_future, components = self._get_future_with_components(
445                 call_args)
446     
447             # Add any provided done callbacks to the created transfer future
448             # to be invoked on the transfer future being complete.
449             for callback in get_callbacks(transfer_future, 'done'):
450                 components['coordinator'].add_done_callback(callback)

好的,所以现在我们有一个TransferFuture,在s3transfer/futures.py 中定义没有明确的证据表明线程已经启动,但是当futures 参与时听起来确实如此。

(Pdb) l
444             transfer_future, components = self._get_future_with_components(
445                 call_args)
446     
447             # Add any provided done callbacks to the created transfer future
448             # to be invoked on the transfer future being complete.
449  ->         for callback in get_callbacks(transfer_future, 'done'):
450                 components['coordinator'].add_done_callback(callback)
451     
452             # Get the main kwargs needed to instantiate the submission task
453             main_kwargs = self._get_submission_task_main_kwargs(
454                 transfer_future, extra_main_kwargs)
(Pdb) transfer_future
<s3transfer.futures.TransferFuture object at 0x7f178db5a780>

下面的最后一行来自TransferCoordinator 类,乍一看似乎很重要:

class TransferCoordinator(object):
    """A helper class for managing TransferFuture"""
    def __init__(self, transfer_id=None):
        self.transfer_id = transfer_id
        self._status = 'not-started'
        self._result = None
        self._exception = None
        self._associated_futures = set()
        self._failure_cleanups = []
        self._done_callbacks = []
        self._done_event = threading.Event()  # < ------ !!!!!!

您通常会看到 threading.Event being used for one thread to signal 一个事件状态,而其他线程可能正在等待该事件发生。

TransferCoordinatorused by TransferFuture.result()

好的,从上面绕回来,我们现在位于s3transfer.futures.BoundedExecutor 及其max_num_threads 属性:

class BoundedExecutor(object):
    EXECUTOR_CLS = futures.ThreadPoolExecutor
    # ...
    def __init__(self, max_size, max_num_threads, tag_semaphores=None,
                 executor_cls=None):
    self._max_num_threads = max_num_threads
    if executor_cls is None:
        executor_cls = self.EXECUTOR_CLS
    self._executor = executor_cls(max_workers=self._max_num_threads)

这基本上是equivalent to:

from concurrent import futures

_executor = futures.ThreadPoolExecutor(max_workers=10)

但还有一个问题:这是“一劳永逸”,还是调用实际上等待线程完成并返回?

似乎是后者。 .result() 打电话给self._done_event.wait(MAXINT)

# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/futures.py#L249

def result(self):
    self._done_event.wait(MAXINT)

    # Once done waiting, raise an exception if present or return the
    # final result.
    if self._exception:
        raise self._exception
    return self._result

最后,重新运行 Victor Val 的测试,这似乎证实了上述情况:

>>> import boto3
>>> import time
>>> import io
>>> 
>>> buf = io.BytesIO(open('100mb.txt', 'rb').read())
>>> 
>>> bucket = boto3.resource('s3').Bucket('test-threads')
>>> start = time.time()
>>> print("starting to upload...")
starting to upload...
>>> bucket.upload_fileobj(buf, '100mb')
>>> print("finished uploading")
finished uploading
>>> end = time.time()
>>> print("time: {}".format(end-start))
time: 2.6030001640319824

(此示例在网络优化实例上运行时,此执行时间可能会更短。但 2.5 秒仍然是一个明显的大块时间,并且根本不表示线程被启动并且没有等待。)


最后,这是一个Callback 的示例,用于.upload_fileobj()。它与文档中的an example 一起出现。

首先,一个小帮手可以有效地获取缓冲区的大小:

def get_bufsize(buf, chunk=1024) -> int:
    start = buf.tell()
    try:
        size = 0 
        while True: 
            out = buf.read(chunk) 
            if out: 
                size += chunk 
            else: 
                break
        return size
    finally:
        buf.seek(start)

类本身:

import os
import sys
import threading
import time

class ProgressPercentage(object):
    def __init__(self, filename, buf):
        self._filename = filename
        self._size = float(get_bufsize(buf))
        self._seen_so_far = 0
        self._lock = threading.Lock()
        self.start = None

    def __call__(self, bytes_amount):
        with self._lock:
            if not self.start:
                self.start = time.monotonic()
            self._seen_so_far += bytes_amount
            percentage = (self._seen_so_far / self._size) * 100
            sys.stdout.write(
                "\r%s  %s of %s  (%.2f%% done, %.2fs elapsed\n" % (
                    self._filename, self._seen_so_far, self._size,
                    percentage, time.monotonic() - self.start))
            # Use sys.stdout.flush() to update on one line
            # sys.stdout.flush()

例子:

In [19]: import io 
    ...:  
    ...: from boto3.session import Session 
    ...:  
    ...: s3 = Session().resource("s3") 
    ...: bucket = s3.Bucket("test-threads") 
    ...: buf = io.BytesIO(open('100mb.txt', 'rb').read()) 
    ...:  
    ...: bucket.upload_fileobj(buf, 'mykey', Callback=ProgressPercentage("mykey", buf))                                                                                                                                                                      
mykey  262144 of 104857600.0  (0.25% done, 0.00s elapsed
mykey  524288 of 104857600.0  (0.50% done, 0.00s elapsed
mykey  786432 of 104857600.0  (0.75% done, 0.01s elapsed
mykey  1048576 of 104857600.0  (1.00% done, 0.01s elapsed
mykey  1310720 of 104857600.0  (1.25% done, 0.01s elapsed
mykey  1572864 of 104857600.0  (1.50% done, 0.02s elapsed

【讨论】:

  • 伟大的工作布拉德!我在看callbacks。您认为这可能是检查在调用上传密钥期间是否已上传相同密钥的有效方法吗?
  • 有趣,会玩这个。看起来它可以用于那个。
  • 哇!我发现 use_threads 参数确实决定了使用(或不使用)多线程。此外,无论使用多个线程还是单个线程,调用都是阻塞的,因为主线程等待所有线程加入。
  • 在此处为upload_fileobj @VictorVal 添加了一个回调示例。但我认为它仍然引出了一个问题——单个客户端能否实现服务器端锁定。我不确定。
【解决方案2】:

upload_fileobj 接受一个 Config 参数。这是一个boto3.s3.transfer.TransferConfig 对象,它又具有一个名为use_threads 的参数(默认为true) - 如果为True,则执行S3 传输时将使用线程。如果为 False,则不会使用线程来执行传输:所有逻辑都将在主线程中运行。

希望这对你有用。

【讨论】:

  • 哇,我使用这个 API 已经有 9 个月了,我什至不知道它可以使用多线程。你知道它在什么条件下会这样做吗?
  • 它将在所有条件下执行,除非设置为 false see here
  • @jmkmay 会一直使用多线程;如果您不想要多线程行为,则需要显式创建一个 TransferConfig 对象,并将 use_threads 设置为 False 并将其传递给 upload_fileobj 函数
  • @Verma 你确定吗?文档说“这是一种托管传输,如有必要,它将在多个线程中执行分段上传。”我知道其他 S3 操作只会执行超过一定大小阈值的 PUT 的多部分。
  • @BradSolomon 文档和我看到的任何代码似乎都表明可以控制单线程或多线程执行。我所做的测试似乎表明执行总是在主线程上。试图更深入地挖掘代码,看看我是否能找到它的底部。
【解决方案3】:

测试方法是否阻塞
我自己凭经验测试了这种行为。首先,我生成了一个 100MB 的文件:

dd if=/dev/zero of=100mb.txt  bs=100M  count=1

然后我尝试以与您相同的方式上传文件并测量花费的时间:

import boto3
import time
import io
file = open('100mb.txt', 'rb')
buf = io.BytesIO(file.read())
bucket = boto3.resource('s3').Bucket('testbucket')
start = time.time()
print("starting to upload...")
bucket.upload_fileobj(buf, '100mb')
print("finished uploading")
end = time.time()
print("time: {}".format(end-start))

upload_fileobj() 方法完成和读取下一个 python 行需要 8 秒以上(1gb 文件需要 50 秒),所以 我认为这个方法是阻塞的

使用线程测试

当使用多个线程时,我可以验证该方法是否同时支持多个传输即使使用选项 use_threads=False。我开始上传一个 200mb 的文件,然后是一个 100mb 的文件,然后 100mb 的文件首先完成。这证实了 TransferConfig 中的并发与多部分传输有关。

代码:

import boto3
import time
import io
from boto3.s3.transfer import TransferConfig
import threading

config = TransferConfig(use_threads=False)

bucket = boto3.resource('s3').Bucket('testbucket')
def upload(filename):
     file = open(filename, 'rb')
     buf = io.BytesIO(file.read())
     start = time.time()
     print("starting to upload file {}".format(filename))
     bucket.upload_fileobj(buf,filename,Config=config)
     end = time.time()
     print("finished uploading file {}. time: {}".format(filename,end-start))
x1 = threading.Thread(target=upload, args=('200mb.txt',))
x2 = threading.Thread(target=upload, args=('100mb.txt',))
x1.start()
time.sleep(2)
x2.start()

输出:

开始上传文件 200mb.txt
开始上传文件 100mb.txt
完成上传文件 100mb.txt。时间:46.35254502296448
完成上传文件 200mb.txt。时间:61.70564889907837

使用会话进行测试
如果您希望上传方法按调用顺序完成,这就是您所需要的。

代码:

import boto3
import time
import io
from boto3.s3.transfer import TransferConfig
import threading

config = TransferConfig(use_threads=False)

session = boto3.session.Session()
s3 = session.resource('s3')
bucket = s3.Bucket('testbucket')
def upload(filename):
     file = open(filename, 'rb')
     buf = io.BytesIO(file.read())
     start = time.time()
     print("starting to upload file {}".format(filename))
     bucket.upload_fileobj(buf,filename)
     end = time.time()
     print("finished uploading file {}. time: {}".format(filename,end-start))
x1 = threading.Thread(target=upload, args=('200mb.txt',))
x2 = threading.Thread(target=upload, args=('100mb.txt',))
x1.start()
time.sleep(2)
x2.start()

输出:

开始上传文件 200mb.txt
开始上传文件 100mb.txt
完成上传文件 200mb.txt。时间:46.62478971481323
完成上传文件 100mb.txt。时间:50.515950202941895

我找到的一些资源:
- This 是在 SO 中提出的关于阻塞或非阻塞方法的问题。这不是决定性的,但那里可能有相关信息。
- 在 GitHub 上有一个开放的issue 允许在 boto3 中进行异步传输。
- 还有像 aiobotoaiobotocore 这样的工具,专门用于允许从/到 s3 和其他 aws 服务的异步下载和上传。

关于我之前的回答
您可以阅读 here 关于 boto3 中的文件传输配置。特别是:

传输操作使用线程来实现并发。线程使用 可以通过将 use_threads 属性设置为 False 来禁用。

最初我认为这与同时执行的多个传输有关。但是阅读source code 使用 TransferConfig 时参数 max_concurrency 中的注释解释说并发不是指多次传输,而是指 “将请求执行传输的线程数”。所以这是用来加速传输的东西。 use_threads 属性仅用于允许多部分传输中的并发性。

【讨论】:

  • 我对其进行了测试,上传阻止了我的执行。我编辑了我的答案。
  • 谢谢布拉德。我在玩线程和会话。显然,当我创建一个会话对象时,我每次都可以执行上传。奇怪的是,完成 100mb 上传的时间只是在 200mb 之后几秒钟,所以 100mb 上传可能在 200mb 完成之前开始,但它总是等待 200mb 完成后再完成。我试过使用s3 = boto3.client('s3', config=botocore.client.Config(max_pool_connections=1)) 并且行为没有改变
  • 注意:您应该在多个线程之间共享相同的s3.Bucket。请参阅boto3 docs。我们以前对此感到厌烦,现在小心翼翼地为每个工作线程创建一个新的Session
猜你喜欢
  • 2013-11-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-07-06
相关资源
最近更新 更多