【问题标题】:use celery for dynamic tasks使用 celery 执行动态任务
【发布时间】:2020-09-03 21:00:50
【问题描述】:

这是想法/工作流程:

我有一堆需要与之通信的网络设备。

一个。客户端(flask/python)向 Celery 发送请求:“mac-table sw1”。

b. celery 查看可用的工人/任务,如果不存在负责 sw1 的工人 - 它将创建一个并将工作传递给它。

对 sw1 的所有后续请求都将自动转发给现有工作人员(从而消除为每个请求建立会话并限制与设备的并发会话)

c。如果worker闲置一段时间,它会关闭与设备的连接并退出。

问题:Celery 适合这个工作流程吗?你知道我可以从中获得灵感的类似工作流程的任何示例吗?

谢谢!

【问题讨论】:

    标签: python celery


    【解决方案1】:

    是的,你可以。

    • 通过传递所需的 task_id 从 Flask 端点调用您的 celery 任务(首先查询任务 ID,如果它不存在,则调用该任务)。
    • 您可以设置任务的时间限制,如下所述: Setting Time Limit on specific task with celery
    @app.route('/task', methods=['POST'])
    def upload(params):
        # query, validate and create new task
        do_task.apply_async(do_task, args=(params,), task_id="mac-table-sw1")
    
    @celery.task(bind=True)
    def do_task(self, params):
        pass
    
    @app.route('/status/<task_id>')
    def taskstatus(task_id):
        task = extract_keywords_task.AsyncResult("mac-table-sw1")
        return task.state
    

    【讨论】:

    • 谢谢,但是,当我使用 ID mac-table-sw1 开始我的任务时,我将在任务中创建例如 netmiko 对象(到交换机的 ssh 连接),将其用于当前任务,返回结果和 celery 将以 ID mac-table-sw1 结束任务并删除 netmiko 对象。下次我运行我的任务时,它将创建新实例,我将不得不再次执行连接部分。
    【解决方案2】:

    如果我理解得很好,您可以通过为特定作业动态创建队列、选择一个可用的工作人员并将其订阅到该队列来实现。它还需要一些清理任务来删除未使用的队列(它可以是每 N 分钟运行一次的任务,检查队列,检查那里是否有任何东西在运行,如果没有取消订阅该队列的工作人员)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-09-28
      • 1970-01-01
      • 2017-06-16
      • 1970-01-01
      • 1970-01-01
      • 2015-09-07
      • 2013-06-30
      • 2020-07-01
      相关资源
      最近更新 更多