【问题标题】:Memory-efficient large dataset streaming to S3内存高效的大型数据集流式传输到 S3
【发布时间】:2017-12-02 09:42:58
【问题描述】:

我正在尝试使用 SQL alchemy 复制 S3 大型数据集(大于 RAM)。 我的限制是:

  1. 我需要使用 sqlalchemy
  2. 我需要将内存压力保持在最低水平
  3. 我不想使用本地文件系统作为中间步骤将数据发送到 s3

我只是想以一种内存高效的方式将数据从 DB 传输到 S3

我可以正常使用数据集(使用下面的逻辑),但使用更大的数据集我遇到了缓冲区问题。

我解决的第一个问题是执行查询通常会在内存中缓冲结果。我使用 fetchmany() 方法。

engine = sqlalchemy.create_engine(db_url)
engine.execution_options(stream_results=True)

results=engine.execute('SELECT * FROM tableX;')
while True:
  chunk = result.fetchmany(10000)
  if not chunk:
    break

另一方面,我有一个 StringIO 缓冲区,我通过 fetchmany 数据检查提供该缓冲区。然后我将其内容发送到 s3。

from io import StringIO
import boto3
import csv

s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())

我遇到的问题本质上是一个设计问题,我如何使这些部分协同工作。甚至可能在同一个运行时?

engine = sqlalchemy.create_engine(db_url)
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')

engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
    chunk = result.fetchmany(10000)
    csv_writer = csv.writer(csv_buffer, delimiter=';')
    csv_writer.writerows(chunk)
    s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
    if not chunk:
        break

我可以让它在一个 fetchmany 周期内工作,但不能在几个周期内工作。有什么想法吗?

【问题讨论】:

    标签: database amazon-s3 sqlalchemy buffer python-3.6


    【解决方案1】:

    我假设“使这些部分一起工作”是指您想要 S3 中的单个文件而不仅仅是部分?您需要做的就是创建一个文件对象,当读取该文件对象时,它将发出下一个批次的查询并缓冲该对象。我们可以使用 python 的生成器:

    def _generate_chunks(engine):
        with engine.begin() as conn:
            conn = conn.execution_options(stream_results=True)
            results = conn.execute("")
            while True:
                chunk = results.fetchmany(10000)
                if not chunk:
                    break
                csv_buffer = StringIO()
                csv_writer = csv.writer(csv_buffer, delimiter=';')
                csv_writer.writerows(chunk)
                yield csv_buffer.getvalue().encode("utf-8")
    

    这是一个文件块的流,所以我们需要做的就是将它们拼接在一起(当然是懒惰的)成一个文件对象:

    class CombinedFile(io.RawIOBase):
        def __init__(self, strings):
            self._buffer = ""
            self._strings = iter(strings)
    
        def read(self, size=-1):
            if size < 0:
                return self.readall()
            if not self._buffer:
                try:
                    self._buffer = next(self._strings)
                except StopIteration:
                    pass
            if len(self._buffer) > size:
                ret, self._buffer = self._buffer[:size], self._buffer[size:]
            else:
                ret, self._buffer = self._buffer, b""
            return ret
    
    chunks = _generate_chunks(engine)
    file = CombinedFile(chunks)
    upload_file_object_to_s3(file)
    

    将文件对象流式传输到 S3 留给读者作为练习。 (你大概可以使用put_object。)

    【讨论】:

    • 太棒了!明天我会用真实数据进行测试并给出反馈。同时,我在想:对数据收集部分进行子处理并将结果连续发送到s3会不会更快?因为使用讨论的方法,过程是:(拉1个数据块->发送1个数据块)x n次,对吗?因此,如果我做对了,当应用程序将数据发送到 s3 时,它会中断收集新数据吗?
    • @Breathe 是的,您可以通过流水线提高效率,但您不必为此使用多处理。由于您使用的是 Python 3.6,我建议使用 async/await 和两个任务,一个发出查询以生成字节流,另一个使用该流并将其上传到 S3。
    • 非常感谢,上面的代码完美运行。成功后我会尝试设置第二种方法并发布代码。
    • 这太棒了!我一直在寻找关于在 Python 中实现流的解释,这是我见过的最好的。还有其他资源可以查看 Python 中的流吗?
    • @edan 不幸的是我不知道。在 Python 中处理流,尤其是异步处理,有点笨拙,需要像这样的手动工作,我不知道有什么库可以让它更容易。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-01
    • 1970-01-01
    • 1970-01-01
    • 2021-01-09
    相关资源
    最近更新 更多