【问题标题】:How to upload CSV, process it in the background and make new file available to download in Flask如何上传 CSV,在后台处理并在 Flask 中下载新文件
【发布时间】:2015-03-16 01:46:59
【问题描述】:

我正在构建一个内部工作工具,只是为了让一些人的生活更轻松。该任务涉及一个 CSV 作为输入,该输入得到处理并生成一个新文件。那部分已经处理好了,我制作了一个可以工作的命令行脚本。但我想让它尽可能易于访问,因此 Web UI 是一个自然的选择。我玩了一下烧瓶,并用它构建了另一个简单的内部工具,我想对这个做同样的事情。

我的这个脚本可能需要一段时间来处理一个文件,有时需要几分钟。这意味着我想让它在 Web UI 中不阻塞。理想情况下,用户选择文件,选择几个选项点击提交,作业在后台启动。完成后应该有一个进度条向用户显示。我不打算在这里进行用户管理,但任何人都可以查看任何人触发的所有工作的列表。作业完成后,应该有一个链接可以下载已处理的文件。

根据我的阅读,这是任务队列系统与消息代理(如 Celery 和 Redis)一起使用的情况。我已经阅读了一些关于这个的教程,但我仍然不确定如何去做......从哪里开始?解决此类问题的最佳架构是什么?另外,如何最好地处理上传部分,也许使用 ajax?

如果可能的话,我正在寻找一些建议、参考和代码示例。任何与我想做的事情相关的例子都值得赞赏。

【问题讨论】:

  • 您甚至可以在任务完成后发送电子邮件通知用户 (mandrill.com)

标签: python flask


【解决方案1】:

从您发布的内容来看,您似乎走在了正确的轨道上。您想要完成的是将资源需求任务卸载给“stub-workers”,并让您的 Web 层专注于向您的用户呈现。

您希望使用的模式通常称为“发布/订阅”模式:

您的应用程序会将 Celery 任务“发布”到消息代理 (Rabbit MQ)。您的 Celery 任务工作者将“订阅”消息代理并寻找要执行的任务。非常简单的模式。

鉴于您使用的标签,我假设您熟悉 Flask(出色的微框架),并建议您查看“Flask-Uploads”以满足您的上传需求。

为了处理您的 CSV 文件,只需创建函数本身并用 @task 装饰器包装它,让 Celery 将其识别为任务。使用一点“Google-fu”,我发现了这个可能对您有用的特定文件:

https://github.com/captricity/captricity-cloud-io/blob/master/captricity_cloud_io/captricity_cloud_io/tasks.py

查看_upload_to_google 函数以了解如何处理此问题。

最后,我最喜欢的作者之一 Miguel Grinberg 写了一篇关于 Celery 和 Flask 的精彩文章,并使用进度条来跟踪您的进度:

http://blog.miguelgrinberg.com/post/using-celery-with-flask

请注意,该示例使用 JavaScript 每 2 秒检查一次给定的 Celery 任务。如果您不期望有很多流量,您当然可以侥幸成功,但如果容量要增长,我建议您研究一下 Web 套接字。

希望这能让您进入一个良好的起点。祝你好运!

【讨论】:

    【解决方案2】:

    看来你是解决问题的方法之一

    我们需要一名后台工作人员来处理大量数据/文件。如果需要很好的处理时间。 在处理我们无法控制的后台任务时,我们需要数据库帮助来跟踪作业状态

    按照步骤完成任务,

    第 1 步:在您的主要路线中执行主要步骤,例如读取请求(例如:.csv 文件)并进行验证,

    第 2 步:创建一个查找表以跟踪工作状态 此表是一个占位符,用于跟踪您的后台作业状态

    • 最初它将状态保存为队列并将作业返回到进程#step5
    • 第三步
    • 返回作业 ID

    第三步: redis enque : 通过redis enque调用处理辅助函数并传递所有需要的args,根据需要设置超时值

    第 4 步:进行处理并更新状态记录成功/失败

    第 5 步:创建另一个路由以返回该作业的当前状态,对该路由进行持续 AJAX 调用,直到状态更改为完成

    通过下面的示例了解更多详细信息,假设您已经为 #ref

    做好了所有 redis 设置(如安装、worke.py、建立 redis 连接等)

    Modles.py(我正在使用 Mongo DB 作为 ex)

    class TempCsvLookup(Base, Document):
        id = SequenceField(unique=True)
        job_id = StringField()
        file_url = StringField()
        status = StringField()
        created_at = DateTimeField(default=dt.utcnow(), required=True)
        finished_at = DateTimeField(default=(dt.utcnow()  + datetime.timedelta(hours=24)))
    

    views.py

    @route('/upload_csv_file', methods=['POST'])
    @require_login
    def upload_csv_file():
    
        #step 1
        csv_file = request.form.get('file')
        '''
            do validations
        '''
        #Step 2: create a initial DB record
        look_up = TempCsvLookup(
            status = "Queued"
        ).save()
    
        #step_3 : calling backgroud task using redis
        job = redis_queue.enqueue(
                process_csv_file,
                args = (
                    look_up, csv_file, other_args 
                ),
                timeout = 1200
            )
        job.get_id()
    
        look_up.update(
            set__job = job.get_id(),
        )
    
        return jsonfy(job_id = str(job.get_id()))
    
    def process_csv_file(look_up, csv_file, other_args):
        try:
            #step 4 process csv file
            look_up.update(
                set__status = "Processing",
            )
    
            """ 
               1.do all processing with input csv file
               2.create new csv file
            """
            look_up.update(
                set__status = "Completed",
                set__file_url = new_file_path,
                set__finished_at = dt.utcnow()
            )
        except Exception as e:
            look_up.update(
                set__status = "Failed",
                set__finished_at = dt.utcnow()
            )
    
    @route('/csv_file_lookup/<string:lookup_job_id>', methods=['GET'])
    @require_login
    def csv_file_lookup(lookup_job_id):
        #step5
        report = TempCsvLookup.objects(job_id=lookup_job_id).first()
    
        resp = {
            'status': report.status,
            'file_url': report.file_url
        }
    
        return response.success(data=resp)
    

    希望对你有帮助

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-01-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-07-17
      相关资源
      最近更新 更多