【问题标题】:Starting celery worker from multiprocessing从多处理开始芹菜工人
【发布时间】:2015-07-04 15:34:27
【问题描述】:

我是芹菜新手。我见过的所有示例都从命令行启动 celery worker。例如:

$ celery -A proj worker -l info

我正在启动一个关于弹性 beanstalk 的项目,并认为让 worker 成为我的 web 应用程序的子进程会很好。我尝试使用多处理,它似乎工作。我想知道这是否是一个好主意,或者是否有一些缺点。

import celery
import multiprocessing


class WorkerProcess(multiprocessing.Process):
    def __init__(self):
        super().__init__(name='celery_worker_process')

    def run(self):
        argv = [
            'worker',
            '--loglevel=WARNING',
            '--hostname=local',
        ]
        app.worker_main(argv)


def start_celery():
    global worker_process
    worker_process = WorkerProcess()
    worker_process.start()


def stop_celery():
    global worker_process
    if worker_process:
        worker_process.terminate()
        worker_process = None


worker_name = 'celery@local'
worker_process = None

app = celery.Celery()
app.config_from_object('celery_app.celeryconfig')

【问题讨论】:

  • 有趣的是,这段代码正在使用 same Celery 实例,用于应用程序和工作程序。否则创建一个工人,例如命令行,似乎总是创建一个新的 Celery 实例。我不知道这是否是一个问题......

标签: python flask multiprocessing celery amazon-elastic-beanstalk


【解决方案1】:

似乎是一个不错的选择,绝对不是唯一的选择,而是一个不错的选择:)

您可能想要研究的一件事(您可能已经在这样做)是将自动缩放与您的 Celery 队列的大小相关联。因此,您只有在队列增长时才扩大规模。

Effectively Celery 当然在内部做了类似的事情,所以没有太大区别。我能想到的唯一障碍是处理外部资源(例如数据库连接),这可能是个问题,但完全取决于您使用 Celery 所做的事情。

【讨论】:

    【解决方案2】:

    如果有人感兴趣,我确实可以在 Elastic Beanstalk 上使用运行 Python 3.4 的预配置 AMI 服务器来完成这项工作。我在运行 Debian Jessie 的基于 Docker 的服务器上遇到了很多问题。也许与端口重新映射有关。 Docker 有点像一个黑盒子,我发现它很难使用和调试。幸运的是,AWS 的好人刚刚在 2015 年 4 月 8 日添加了一个非 docker Python 3.4 选项。

    我进行了很多搜索以使其部署和工作。我看到很多问题没有答案。所以这是我部署的非常简单的 python 3.4/flask/celery 进程。

    Celery 你可以直接 pip 安装。您需要使用配置命令或 container_command 从配置文件安装 rabbitmq。我在上传的项目 zip 中使用了一个脚本,因此需要一个 container_command 才能使用该脚本(常规的 eb config 命令在项目安装之前执行)。

    [yourapproot]/.ebextensions/05_install_rabbitmq.config:

    container_commands:
      01RunScript:
        command: bash ./init_scripts/app_setup.sh
    

    [yourapproot]/init_scripts/app_setup.sh:

    #!/usr/bin/env bash
    
    # Download and install Erlang
    yum install erlang
    
    # Download the latest RabbitMQ package using wget:
    wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmq-server-3.5.1-1.noarch.rpm
    
    # Install rabbit
    rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
    yum -y install rabbitmq-server-3.5.1-1.noarch.rpm
    
    # Start server
    /sbin/service rabbitmq-server start
    

    我正在做一个烧瓶应用程序,所以我在第一个请求之前启动了工作人员:

    @app.before_first_request
    def before_first_request():
        task_mgr.start_celery()
    

    task_mgr 创建 celery app 对象(我称之为 celery,因为烧瓶 app 对象是 app)。 -Ofair 在这里非常关键,对于一个简单的任务管理器。任务预取有各种奇怪的行为。这应该是默认的吧?

    task_mgr/task_mgr.py:

    import celery as celery_module
    import multiprocessing
    
    
    class WorkerProcess(multiprocessing.Process):
        def __init__(self):
            super().__init__(name='celery_worker_process')
    
        def run(self):
            argv = [
                'worker',
                '--loglevel=WARNING',
                '--hostname=local',
                '-Ofair',
            ]
            celery.worker_main(argv)
    
    
    def start_celery():
        global worker_process
        multiprocessing.set_start_method('fork')  # 'spawn' seems to work also
        worker_process = WorkerProcess()
        worker_process.start()
    
    
    def stop_celery():
        global worker_process
        if worker_process:
            worker_process.terminate()
            worker_process = None
    
    
    worker_name = 'celery@local'
    worker_process = None
    
    celery = celery_module.Celery()
    celery.config_from_object('task_mgr.celery_config')
    

    到目前为止,我的配置非常简单:

    task_mgr/celery_config.py:

    BROKER_URL = 'amqp://'
    CELERY_RESULT_BACKEND = 'amqp://'
    
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_TASK_SERIALIZER = 'json'  # 'pickle' warning: can't use datetime in json
    CELERY_RESULT_SERIALIZER = 'json'  # 'pickle' warning: can't use datetime in json
    CELERY_TASK_RESULT_EXPIRES = 18000  # Results hang around for 5 hours
    
    CELERYD_CONCURRENCY = 4
    

    然后你可以把任务放在你需要的地方:

    from task_mgr.task_mgr import celery
    import time
    
    
    @celery.task(bind=True)
    def error_task(self):
        self.update_state(state='RUNNING')
        time.sleep(10)
        raise KeyError('im an error')
    
    
    @celery.task(bind=True)
    def long_task(self):
        self.update_state(state='RUNNING')
        time.sleep(20)
        return 'long task finished'
    
    
    @celery.task(bind=True)
    def task_with_status(self, wait):
        self.update_state(state='RUNNING')
        for i in range(5):
            time.sleep(wait)
            self.update_state(
                state='PROGRESS',
                meta={
                    'current': i + 1,
                    'total': 5,
                    'status': 'progress',
                    'host': self.request.hostname,
                }
            )
        time.sleep(wait)
        return 'finished with wait = ' + str(wait)
    

    我还保留了一个任务队列来保存异步结果,以便我可以监控任务:

    task_queue = []
    
    
    def queue_task(task, *args):
        async_result = task.apply_async(args)
        task_queue.append(
            {
                'task_name':task.__name__,
                'task_args':args,
                'async_result':async_result
            }
        )
        return async_result
    
    
    def get_tasks_info():
        tasks = []
    
        for task in task_queue:
            task_name = task['task_name']
            task_args = task['task_args']
            async_result = task['async_result']
            task_id = async_result.id
            task_state = async_result.state
            task_result_info = async_result.info
            task_result = async_result.result
            tasks.append(
                {
                    'task_name': task_name,
                    'task_args': task_args,
                    'task_id': task_id,
                    'task_state': task_state,
                    'task_result.info': task_result_info,
                    'task_result': task_result,
                }
            )
    
        return tasks
    

    当然,在你需要的地方开始任务:

    from webapp.app import app
    from flask import url_for, render_template, redirect
    from webapp import tasks
    from task_mgr import task_mgr
    
    
    @app.route('/start_all_tasks')
    def start_all_tasks():
        task_mgr.queue_task(tasks.long_task)
        task_mgr.queue_task(tasks.error_task)
        for i in range(1, 9):
            task_mgr.queue_task(tasks.task_with_status, i * 2)
    
        return redirect(url_for('task_status'))
    
    
    @app.route('/task_status')
    def task_status():
        current_tasks = task_mgr.get_tasks_info()
        return render_template(
            'parse/task_status.html',
            tasks=current_tasks
        )
    

    就是这样。如果您需要任何帮助,请告诉我,尽管我对芹菜的了解仍然相当有限。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-09-23
      • 1970-01-01
      • 1970-01-01
      • 2021-10-24
      • 1970-01-01
      • 2014-08-17
      • 2013-10-04
      相关资源
      最近更新 更多