在您的场景中申请大文件的一种策略是将其分块上传到 Amazon S3 存储桶。
您可以使用boto3 库中提供的MultipartUpload API。如果您想在上传后将块连接到单个文件,我建议使用这种方式。
这解释了如果您认为在这里连接块并不重要,您可以实现一个幼稚的编写器。
一个天真的作家可以将您的集合中的文档分块上传,如下所示。
请注意,您可以对此进行改进以处理错误并执行重试。您还可以并行运行上传。
import json
import io
import boto3
from pymongo import MongoClient
class ObjectChunkIO:
def __init__(self, bucket, prefix):
self.bucket = bucket
self.prefix = prefix
self.iterator = self.chunk_iterator()
def next_chunk(self):
return next(self.iterator)
def chunk_iterator(self):
raise NotImplementedError
将 MongoDb 集合作为块上传到 Amazon S3 存储桶
class ObjectChunkWriter(ObjectChunkIO):
def __init__(self, bucket, prefix, chunk_size=5e7):
super().__init__(bucket, prefix)
self.chunk_num = 0
# the object size in bytes. defaults to 50MB
self.chunk_size = chunk_size
def _upload_buf(self, object_data):
self.bucket.put_object(Body=object_data, Key=self.path())
def chunk_iterator(self):
while True:
buffer = io.BytesIO()
yield buffer
buffer.close()
self.chunk_num += 1
def path(self):
return '{prefix}.{0:03d}.json'.format(
self.chunk_num, prefix=self.prefix)
def write(self, cur):
with self.next_chunk() as buf:
for doc in cur:
b_doc = json.dumps(doc).encode('utf-8')
if buf.tell() + len(b_doc) > self.chunk_size:
self._upload_buf(buf.getvalue())
buf = self.next_chunk()
buf.writelines([b_doc, b'\n'])
self._upload_buf(buf.getvalue())
可以这样使用:
def get_collection():
db_client = MongoClient()
db = db_client.<database>
return db.<collection>
def get_bucket():
session = boto3.Session(profile_name='<profile>')
s3_resource = session.resource('s3')
return s3_resource.Bucket('<bucket>')
def upload_in_chunks(cur, bucket):
''' Uploads documents returned in a MongoDb cur in chunks to S3 bucket
Args:
- cur: pymongo.cursor.Cursor
- collection: s3.Bucket
'''
object_chunk_writer = ObjectChunkWriter(bucket,
prefix='streaming/sample-output/chunk')
object_chunk_writer.write(cur)
collection = get_collection()
cur = collection.find()
bucket = get_bucket()
upload_in_chunks(cur, bucket)
将上传的块读入 MongoDb 集合
读取上传的块是相当容易的部分。
class ObjectChunkReader(ObjectChunkIO):
def chunk_iterator(self):
object_chunks = self.bucket.objects.filter(Prefix=self.prefix, MaxKeys=3)
for object_summary in object_chunks:
response = object_summary.get()
body = response['Body']
chunk_object = [json.loads(line) for line in body.iter_lines()]
yield chunk_object
def __iter__(self):
for object_chunk in self.iterator:
yield object_chunk
def write_objects_to_collection(bucket, collection):
''' Writes s3 objects to a MongoDb collection
Args:
- bucket: s3.Bucket
- collection: pymongo.collection.Collection
'''
object_chunk_reader = ObjectChunkReader(bucket,
prefix='streaming/sample-output/chunk')
for object_chunk in object_chunk_reader:
collection.insert_many(object_chunk)
collection = get_collection()
bucket = get_bucket()
write_objects_to_collection(bucket, collection)