【问题标题】:Streaming a binary file to Google Storage using Tornado使用 Tornado 将二进制文件流式传输到 Google 存储
【发布时间】:2018-01-02 15:43:26
【问题描述】:

我正在尝试通过我的服务器将二进制文件从客户端请求流式传输到 Google Cloud Storage。

我正在使用 Tornado 框架将数据从请求流式传输到服务器,并使用 Google Cloud Storage API 将文件流式传输到 Google -upload_from_file 方法。

我是 Tornado 的新手,我正在使用 @stream_request_body 装饰器,因此我可以分块从请求中获取数据并将每个块上传到 Google。

我尝试打开一个类似文件的对象,在将“文件”本身上传到 Google 时,我可以将每个块写入该对象。

问题是在我开始向其写入块之前,我无法将“文件”上传到 Google。

任何帮助将不胜感激。

【问题讨论】:

  • "The problem is that I can't upload the 'file' to Google before I start writing chunks to it." 那么,你写完第一个块之后开始上传?或者你的意思是在所有的块都写完之前你不能上传文件?

标签: python file-upload stream google-cloud-storage tornado


【解决方案1】:

使用 Google 的 HTTP 库执行此操作很棘手,因为它们是为同步使用而设计的。您需要将实际上传放在另一个线程上以避免阻塞 IOLoop。您可以使用os.pipe 在 Tornado 线程和上传线程之间进行通信(将管道的写入端包装在 PipeIOStream 中,将读取端包装在 os.fdopen 中)。这是一个未经测试的解决方案草图:

def prepare(self):
    r, w = os.pipe()
    self.write_pipe = tornado.iostream.PipeIOStream(w)
    # Create our "file-like object" for upload_from_file
    self.read_pipe = os.fdopen(r)
    # Create an event for the upload thread to communicate back
    # to tornado when it's done, and save a reference to our IOLoop.
    self.upload_done = tornado.locks.Event()
    self.io_loop = tornado.ioloop.IOLoop.current()
    # Consider using a tornado.locks.Semaphore to limit the number of
    # threads you can create.
    self.thread = threading.Thread(target=self.upload_file)
    self.thread.start()

def upload_file(self):
    google_client.upload_from_file(self.read_pipe)
    # tell the IOLoop thread we're finished
    self.io_loop.add_callback(self.upload_done.set)

async def data_received(self, chunk):
    await self.write_pipe.write(chunk)

async def put(self):  # or post()
    self.write_pipe.close()
    await self.upload_done.wait()
    self.thread.join()
    self.render("upload_done.html")

或者,您可以避免使用 google 的同步库,并使用底层 HTTP API 和 AsyncHTTPClient 完成所有操作。以这种方式整理身份验证很棘手,但可以避免线程不匹配。这将涉及使用 body_producer,如 this gist

【讨论】:

  • 感谢您的回复。问题是我尝试了两种方式。你的和你附加的链接。两者都在同一阶段失败,即当我试图从管道中读取时,它仍然是空的。我怎样才能避免这种情况?我尝试在第一个块之后开始上传,但它仍然得到一个空流。
  • 正如我所说,我还没有实际测试过这个。但是从空管道读取应该阻塞而不是返回错误。它究竟是如何失败的?也许 upload_from_file 出于其他原因无法处理管道。但无论如何,要点中的版本应该可以工作 - 那里不涉及管道,虽然我还没有测试它是否可以上传到 GCS,但我已经测试了它通常会将传入的请求转发到另一台服务器。
  • 我认为读取工作正常,但在 upload_from_file 方法上,Google 的实现使用了不会阻塞的 stream.tell(),而是抛出“OSError: [Errno 29] Illegal seek”跨度>
  • 啊,我明白了。他们从不准确地解释他们需要从“类文件对象”中得到什么。因此,要使用upload_from_file,您需要将数据流式传输到实际文件,然后从那里上传。或者你可以使用 AsyncHTTPClient 版本。
猜你喜欢
  • 1970-01-01
  • 2010-12-03
  • 1970-01-01
  • 2016-02-11
  • 1970-01-01
  • 2012-05-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多