【问题标题】:Tornado: mime-type of the stream_request_body outputTornado:stream_request_body 输出的 mime 类型
【发布时间】:2023-11-13 14:31:01
【问题描述】:

我正在使用 tornado.web.stream_request_body (tornado v4.0.1) 来保存帖子数据,但似乎已保存文件的 mime 类型仍为 application/octet-stream

我需要知道上传文件的 mime 类型,这是我的代码 sn-p;

@stream_request_body
class MainHandler(tornado.web.RequestHandler):
    def post(self):
        pass

    def prepare(self):
        self.temp_file = tempfile.NamedTemporaryFile(delete=False)

    def data_received(self, chunk):
        self.temp_file.write(chunk)

额外信息; 使用带有--data-binary 参数的curl 正确保存文件;

curl -v -XPOST --data-binary @example.pdf localhost:8888/

但浏览器使用multipart/form-data 上传或带有参数-d 的curl 不起作用。 当我使用文本编辑器查看保存的文件时,我在第一行看到 http post 参数;

-----------------------------192365691191582744435855330
Content-Disposition: form-data; name="upload"; filename="example.mp3"
Content-Type: audio/mpeg

...
unreadable binary data

对此有什么想法吗?

【问题讨论】:

    标签: python-3.x mime-types tornado content-type


    【解决方案1】:

    更新:我为 Python3 和 Tornado 4.0+ 创建了一个可以与 PyPi 一起安装的包:https://pypi.python.org/pypi/tornadostreamform

    我知道前面的答案已经被接受,但我也遇到了同样的问题,我可以为 Python 3 提供一个完整的模块——我们称之为 post_streamer——它将任何流解析为任何请求的部分,而无需使用很多内存。

    #!/usr/bin/env python3
    """Post data streamer for tornadoweb 4.0"""
    import os
    import re
    import random
    import tempfile
    
    class SizeLimitError(Exception):
        pass
    
    class PostDataStreamer:
        """Parse a stream of multpart/form-data.
    
        Useful for request handlers decorated with tornado.web.stream_request_body"""
        SEP = b"\r\n"
        LSEP = len(SEP)
        PAT_HEADERVALUE = re.compile(r"""([^:]+):\s+([^\s;]+)(.*)""")
        PAT_HEADERPARAMS = re.compile(r""";\s*([^=]+)=\"(.*?)\"(.*)""")
    
        # Encoding for the header values. Only header name and parameters
        # will be decoded. Streamed data will remain binary.
        # This is required because multipart/form-data headers cannot
        # be parsed without a valid encoding.
        header_encoding = "UTF-8"
    
        def __init__(self, total, tmpdir=None):
            self.buf = b""
            self.dlen = None
            self.delimiter = None
            self.in_data = False
            self.headers = []
            self.parts = []
            self.total = total
            self.received = 0
            self.tmpdir = tmpdir
    
        def _get_raw_header(self,data):
            idx = data.find(self.SEP)
            if idx>=0:
                return (data[:idx], data[idx+self.LSEP:])
            else:
                return (None, data)
    
        def receive(self, chunk):
            self.received += len(chunk)
            self.on_progress()
            self.buf += chunk
    
            if not self.delimiter:
                self.delimiter, self.buf = self._get_raw_header(self.buf)
                if self.delimiter:
                    self.delimiter+=self.SEP
                    self.dlen = len(self.delimiter)
                elif len(self.buf)>1000:
                    raise Exception("Cannot find multipart delimiter")
                else:
                    return
    
            while True:
                if self.in_data:
                    if (len(self.buf)>3*self.dlen):
                        idx = self.buf.find(self.SEP+self.delimiter)
                        if idx>=0:
                            self.feed_part(self.buf[:idx])
                            self.end_part()
                            self.buf = self.buf[idx+len(self.SEP+self.delimiter):]
                            self.in_data = False
                        else:
                            limit = len(self.buf)-2*self.dlen
                            self.feed_part(self.buf[:limit])
                            self.buf = self.buf[limit:]
                            return
                    else:
                        return
                if not self.in_data:
                    while True:
                        header, self.buf = self._get_raw_header(self.buf)
                        if header==b"":
                            assert(self.delimiter)
                            self.in_data = True
                            self.begin_part(self.headers)
                            self.headers = []
                            break
                        elif header:
                            self.headers.append(self.parse_header(header))
                        else:
                            # Header is None, not enough data yet
                            return
    
        def parse_header(self,header):
            header = header.decode(self.header_encoding)
            res = self.PAT_HEADERVALUE.match(header)
            if res:
                name,value,tail = res.groups()
                params = {}
                hdr = {"name":name,"value":value,"params":params}
                while True:
                    res = self.PAT_HEADERPARAMS.match(tail)
                    if not res:
                        break
                    fname,fvalue,tail = res.groups()
                    params[fname] = fvalue
                return hdr
            else:
                return {"value":header}
    
        def begin_part(self,headers):
            """Internal method called when a new part is started."""
            self.fout = tempfile.NamedTemporaryFile(
                dir=self.tmpdir, delete=False)
            self.part = {
                "headers":headers,
                "size":0,
                "tmpfile":self.fout
            }
            self.parts.append(self.part)
    
        def feed_part(self,data):
            """Internal method called when content is added to the current part."""
            self.fout.write(data)
            self.part["size"] += len(data)
    
        def end_part(self):
            """Internal method called when receiving the current part has finished."""
            # Will not close the file here, so we will be able to read later.
            #self.fout.close()            
            #self.fout.flush() This is not needed because we update part["size"]
            pass
    
        def finish_receive(self):
            """Call this after the last receive() call.
    
            You MUST call this before using the parts."""
            if self.in_data:
                idx = self.buf.rfind(self.SEP+self.delimiter[:-2])
                if idx>0:
                    self.feed_part(self.buf[:idx])
                self.end_part()
    
        def release_parts(self):
            """Call this to remove the temporary files."""
            for part in self.parts:
                part["tmpfile"].close()
                os.unlink(part["tmpfile"].name)
    
        def get_part_payload(self, part):
            """Return the contents of a part.
    
            Warning: do not use this for big files!"""
            fsource = part["tmpfile"]
            fsource.seek(0)
            return fsource.read()
    
        def get_part_ct_params(self, part):
            """Get content-disposition parameters.
    
            If there is no content-disposition header then it returns an
            empty list."""
            for header in part["headers"]:
                if header.get("name","").lower().strip()=="content-disposition":
                    return header.get("params",[])
            return []
    
        def get_part_ct_param(self, part, pname, defval=None):
            """Get parameter for a part.
    
            @param part: The part
            @param pname: Name of the parameter, case insensitive
            @param defval: Value to return when not found.
            """
            ct_params = self.get_part_ct_params(part)
            for name in ct_params:
                if name.lower().strip()==pname:
                    return ct_params[name]
            return defval
    
        def get_part_name(self, part):
            """Get name of a part.
    
            When not given, returns None."""
            return self.get_part_ct_param(part, "name", None)
    
        def get_parts_by_name(self, pname):
            """Get a parts by name.
    
            @param pname: Name of the part. This is case sensitive!
    
            Attention! A form may have posted multiple values for the same
            name. So the return value of this method is a list of parts!"""
            res = []
            for part in self.parts:
                name = self.get_part_name(part)
                if name==pname:
                    res.append(part)
            return res
    
        def get_values(self, fnames, size_limit=10*1024):
            """Return a dictionary of values for the given field names.
    
            @param fnames: A list of field names.
            @param size_limit: Maximum size of the value of a single field.
                If a field's size exceeds this then SizeLimitError is raised.
    
            Warning: do not use this for big file values.
            Warning: a form may have posted multiple values for a field name.
                This method returns the first available value for that name.
                To get all values, use the get_parts_by_name method.
            Tip: use get_nonfile_names() to get a list of field names
                that are not originally files.
            """
            res = {}
            for fname in fnames:
                parts = self.get_parts_by_name(fname)
                if not parts:
                    raise KeyError("No such field: %s"%fname)
                size = parts[0]["size"]
                if size>size_limit:
                    raise SizeLimitError("Part size=%s > limit=%s"%(size, limit))
                res[fname] = self.get_part_payload(parts[0])
            return res
    
        def get_nonfile_names(self):
            """Get a list of part names are originally not files.
    
            It examines the filename attribute of the content-disposition header.
            Be aware that these fields still may be huge in size."""
            res = []
            for part in self.parts:
                filename = self.get_part_ct_param(part, "filename", None)
                if filename is None:
                    name = self.get_part_name(part)
                    if name:
                        res.append(name)
            return res
    
        def examine(self):
            """Debugging method for examining received data."""
            print("============= structure =============")
            for idx,part in enumerate(self.parts):
                print("PART #",idx)
                print("    HEADERS")
                for header in part["headers"]:
                    print("        ",repr(header.get("name","")),"=",repr(header.get("value","")))
                    params = header.get("params",None)
                    if params:
                        for pname in params:
                            print("            ",repr(pname),"=",repr(params[pname]))
                print("    DATA")
                print("        SIZE", part["size"])
                print("        LOCATION",part["tmpfile"].name)
                if part["size"]<80:
                    print("        PAYLOAD:",repr(self.get_part_payload(part)))
                else:
                    print("        PAYLOAD:","<too long...>")
            print("========== non-file values ==========")
            print(self.get_values(self.get_nonfile_names()))
    
    
        def on_progress(self):
            """Override this function to handle progress of receiving data."""
            pass # Received <self.received> of <self.total>
    

    它可能会更高效一些,但它是可移植的,不会将任何大的东西加载到内存中。这里是你如何使用它,用 tornado web 4.0 测试,(和 firefox 和 pycurl 作为客户端)。只需启动此服务器并将浏览器指向本地主机,端口 8888

    #!/usr/bin/env python3
    from tornado.ioloop import IOLoop
    from tornado.web import RequestHandler, Application, url, stream_request_body
    from tornado.httpserver import HTTPServer
    from tornado.ioloop import IOLoop
    from post_streamer import PostDataStreamer
    import sys
    
    class MyPostDataStreamer(PostDataStreamer):
        percent = 0
    
        def on_progress(self):
            """Override this function to handle progress of receiving data."""
            if self.total:
                new_percent = self.received*100//self.total
                if new_percent != self.percent:
                    self.percent = new_percent
                    print("progress",new_percent)
    
    @stream_request_body 
    class StreamHandler(RequestHandler):
        def get(self):
            self.write('''<html><body>
    <form method="POST" action="/" enctype="multipart/form-data">
    File #1: <input name="file1" type="file"><br>
    File #2: <input name="file2" type="file"><br>
    File #3: <input name="file3" type="file"><br>
    Other field 1: <input name="other1" type="text"><br>
    Other field 2: <input name="other2" type="text"><br>
    Other field 3: <input name="other3" type="text"><br>
    <input type="submit">
    </form>
    </body></html>''')
    
        def post(self):
            try:
                #self.fout.close()
                self.ps.finish_receive()
                # Use parts here!
                self.set_header("Content-Type","text/plain")
                oout = sys.stdout
                try:
                    sys.stdout = self
                    self.ps.examine()
                finally:
                    sys.stdout = oout
            finally:
                # Don't forget to release temporary files.
                self.ps.release_parts()
    
        def prepare(self):
            # TODO: get content length here?
            try:
                total = int(self.request.headers.get("Content-Length","0"))
            except:
                total = 0
            self.ps = MyPostDataStreamer(total) #,tmpdir="/tmp"
            #self.fout = open("raw_received.dat","wb+")
    
        def data_received(self, chunk):
            #self.fout.write(chunk)
            self.ps.receive(chunk)
    
    def main():
        application = Application([
            url(r"/", StreamHandler),
        ])
        max_buffer_size = 4 * 1024**3 # 4GB
        http_server = HTTPServer(
            application,
            max_buffer_size=max_buffer_size,
        )
        http_server.listen(8888)
        IOLoop.instance().start()
    
    main()
    

    finish_receive() 被调用后,您可以使用 PostDataStreamer.params 和 PostDataStreamer.get_part_ct_param(part, "Content-Type") 访问 Content-Type 标头

    更新:通常不应增加 max_buffer_size。通常不应增加 max_body_size。它们应保持在较低的值。只有在使用 stream_request_body 修饰的处理程序的 prepare() 方法中,我们才应该调用 self.request.connection.set_max_body_size() 来设置可以流式传输的最大大小。详情见:https://groups.google.com/forum/#!topic/python-tornado/izEXQd71rQk

    这是龙卷风中未记录的部分。我正在准备一个可用于处理开箱即用的文件上传的模块。准备好后,我会在这里放一个链接。

    【讨论】:

    • 哇,干得好!测试它,它按预期工作!一项改进.. Tornado 默认将正文大小限制为 100MB。您可以在请求处理程序的初始化中使用self.request.connection.set_max_body_size(size)
    • 同意这在稍微计算参数后确实有效,好东西。
    • Equanox:我正在更新这个,因为 - 结果证明 - max_body_size 和 max_buffer_size 不能以正常方式安全地增加。
    【解决方案2】:

    在 stream_request_body 模式下,您将获得客户端上传的原始正文,无需创建 self.request.arguments 或 self.request.files 的处理。这是一个 multipart/form-data 包装器(不是 http 标头,尽管它们看起来很相似);您需要对其进行解析以获取文件名和嵌入的数据。我认为标准库的电子邮件包中与 mime 相关的工具可能是您将其保存到文件后解析它的最佳方法。

    【讨论】:

    • 很遗憾没有。最好的选择是 email.contentmanager。 docs.python.org/3/library/email.contentmanager.html 但是它也会将 mime 消息加载到内存中。我想知道是否有人已经为此创建了一个 Python 函数,它可以从原始帖子数据中提取文件,而不会使用太多内存。
    • 嗯,也许使用 headersonly 参数,您将能够只处理标题,并获取内容类型。但不是数据...
    最近更新 更多