【发布时间】:2017-12-02 09:42:58
【问题描述】:
我正在尝试使用 SQL alchemy 复制 S3 大型数据集(大于 RAM)。 我的限制是:
- 我需要使用 sqlalchemy
- 我需要将内存压力保持在最低水平
- 我不想使用本地文件系统作为中间步骤将数据发送到 s3
我只是想以一种内存高效的方式将数据从 DB 传输到 S3。
我可以正常使用数据集(使用下面的逻辑),但使用更大的数据集我遇到了缓冲区问题。
我解决的第一个问题是执行查询通常会在内存中缓冲结果。我使用 fetchmany() 方法。
engine = sqlalchemy.create_engine(db_url)
engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
chunk = result.fetchmany(10000)
if not chunk:
break
另一方面,我有一个 StringIO 缓冲区,我通过 fetchmany 数据检查提供该缓冲区。然后我将其内容发送到 s3。
from io import StringIO
import boto3
import csv
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
我遇到的问题本质上是一个设计问题,我如何使这些部分协同工作。甚至可能在同一个运行时?
engine = sqlalchemy.create_engine(db_url)
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
chunk = result.fetchmany(10000)
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
if not chunk:
break
我可以让它在一个 fetchmany 周期内工作,但不能在几个周期内工作。有什么想法吗?
【问题讨论】:
标签: database amazon-s3 sqlalchemy buffer python-3.6