【发布时间】:2019-01-31 06:44:51
【问题描述】:
我们在 Ubuntu 14.04 上使用 Django、Celery 和 Rabbitmq 设置了 Python 3.6.1。现在,我正在使用 Django 调试服务器(对于 dev 和 Apache 不工作)。我目前的问题是芹菜工人从 Python 启动并立即死亡——进程显示为已失效。如果我在终端窗口中使用相同的命令,如果队列中有一个等待,则创建工作人员并接手任务。
命令如下:
celery worker --app=myapp --loglevel=info --concurrency=1 --maxtasksperchild=20 -n celery_1 -Q celery
无论设置哪个队列,都会出现相同的功能。
在终端中,我们看到输出myapp.settings - INFO - Loading...,后面跟着描述队列和列出任务的输出。从 Python 运行时,我们看到的最后一件事是 Loading...
在代码中,我们确实进行了检查,以确保我们没有以 root 身份运行 celery 命令。
这些是我们settings.py 文件中的 Celery 设置:
CELERY_ACCEPT_CONTENT = ['json','pickle']
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IMPORTS = ('api.tasks',)
CELERYD_PREFETCH_MULTIPLIER = 1
CELERYD_CONCURRENCY = 1
BROKER_POOL_LIMIT = 120 # Note: I tried this set to None but it didn't seem to make any difference
CELERYD_LOG_COLOR = False
CELERY_LOG_FORMAT = '%)asctime)s - $(processName)s - %(levelname)s - %(message)s'
CELERYD_HIJACK_ROOT_LOGGER = False
STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(psconf.BASE_DIR, 'myapp_static/')
BROKER_URL = psconf.MQ_URI
CELERY_RESULT_BACKEND = 'rpc'
CELERY_RESULT_PERSISTENT = True
CELERY_ROUTES = {}
for entry in os.scandir(psconf.PLUGIN_PATH):
if not entry.is_dir() or entry.name == '__pycache__':
continue
plugin_dir = entry.name
settings_file = f'{plugin_dir}.settings'
try:
plugin_tasks = importlib.import_module(settings_file)
queue_name = plugin_tasks.QUEUENAME
except ModuleNotFoundError as e:
logging.warning(e)
except AttributeError:
logging.debug(f'The plugin {plugin_dir} will use the general worker queue.')
else:
CELERY_ROUTES[f'{plugin_dir}.tasks.run'] = {'queue': queue_name}
logging.debug(f'The plugin {plugin_dir} will use the {queue_name} queue.')
这是启动工人的部分:
class CeleryWorker(BackgroundProcess):
def __init__(self, n, q):
self.name = n
self.worker_queue = q
cmd = f'celery worker --app=myapp --loglevel=info --concurrency=1 --maxtasksperchild=20 -n {self.name" -Q {self.worker_queue}'
super().__init__(cmd, cwd=str(psconf.BASE_DIR))
class BackgroundProcess(subprocess.Popen):
def __init__(self, args, **kwargs):
super().__init__(args, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, **kwargs)
感谢任何有关如何从 Python 中获得此功能的建议。我是 Rabbitmq/Celery 的新手。
【问题讨论】:
-
你提到了
the celery workers get launched from Python and immediately die... 但我没有看到任何真正启动 celery worker 的 Python 代码。您能否将其包括在内或更详细地描述这一点? -
@sytech - 我已经编辑了帖子以包含代码 sn-ps 以显示工作人员是如何启动的。
-
这个“CeleryWorker”在哪里以及如何使用?但无论如何:没有理由你的 django 代码应该启动 celery worker,并且考虑到内置的开发服务器和 wsgi 连接器都倾向于启动和停止 Django 子进程,你有很好的理由不希望你的 django 启动 celery worker。
-
我们有一个管理器 GUI,可以在启动时启动工作人员。我们获取标准输出和标准错误并在 GUI 中显示它们。这一切都像 Windows 10 下的魅力一样。我试图让它在 Linux 中运行,这就是痛苦开始的地方。