【发布时间】:2022-01-06 16:24:18
【问题描述】:
我目前在我们的 s3 存储桶中有一个位置,需要在最繁忙的时间每小时归档多达 600 万个文件。当前的逻辑如下所示:
def get_responses(bucket, prefix):
"""
This method will get the file information for a given directory on s3.
Args:
bucket (str): name of s3 bucket
prefix (str): directory within s3 bucket
Returns:
list of json responses from S3
"""
client = boto3.client(
"s3", aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET_KEY)
continuation_token = None
responses = []
# List objects within the given directory until the response is truncated
while True:
list_kwargs = dict(
Bucket=bucket, Prefix=prefix, MaxKeys=1000)
# Add continuation token if not None
if continuation_token:
list_kwargs['ContinuationToken'] = continuation_token
response = client.list_objects_v2(**list_kwargs)
# Add valid reponses and update continuation token
if 'Contents' in response:
responses += response['Contents']
# Exit while loop if at the end of the objects
if not response.get('IsTruncated'):
break
continuation_token = response.get('NextContinuationToken')
return responses
如您所见,这一次迭代超过 1000 个文件,这是 list_objects_v2 的当前限制。在最坏的情况下,这大约需要半个小时,并且会减慢整个工作的速度,而这部分工作最好在一个小时内运行。有没有办法将其并行化,以便您可以一次在每个线程上列出 1000 个文件?
(所有文件都在一个前缀下,不能拆分)
【问题讨论】: