【问题标题】:MongoDB into a Celery Task - Flask ApplicationMongoDB 变成一个 Celery 任务 - Flask 应用程序
【发布时间】:2018-04-09 23:48:07
【问题描述】:

我正在尝试在我的 Flask 应用程序中使用 Celery。 我在文件insight_tasks.py 中定义了一个任务。 在该文件中定义了一个函数:

@celery_app.task
def save_insights_task()

该函数做了一些事情,然后出现错误,我正在尝试将数据保存到 MongoDB 中,控制台抛出了我: MongoEngineConnectionError('您尚未定义默认连接',) 所以我认为这是因为 MongoEngine 尚未初始化,这是我的问题:

我应该如何在 Celery 任务中使用 MongoDB?因为在我的路由(烧瓶应用程序)上使用 MongoDB 时,它按预期工作。

Celery 不共享数据库实例?

文件:

__init__.py(Celery 初始化)

celery_app = Celery('insights',
                broker=config.CELERY_LOCATIONS_BROKER_URL,
                backend=config.CELERY_LOCATIONS_RESULT_BACKEND,
                include=['app.insight_tasks']
                )

insight_tasks.py

from app.google import google_service
from app.models import LocationStats
from . import celery_app
from firebase_admin import db as firebase_db
import arrow


@celery_app.task
def save_insight_task(account_location, uid, gid, locations_obj, aggregation):

    try:
        insights, reviews = google_service.store_location_resources(
            gid, uid,
            start_datetime, end_datetime,
            account_location, aggregation
        )
    except StandardError as err:
        from pprint import pprint
        import traceback
        pprint(err)
        pprint(traceback.print_exc())

    path = 'saved_locations/{}/accounts/{}'.format(gid, account_location)
    location = [loc for loc in locations_obj if loc['name'] == 'accounts/' + account_location]
    if len(location) > 0:
        firebase_db.reference(path).update(location[0])

这里的google_service.store_location_resources() 是将这些数据保存到MongoDB 中的函数。此功能在我的应用程序的 routes 的另一边使用,因此它按预期工作,除了 Celery 任务

---------

Celery 任务被调用到 POST 请求中

accounts/routes.py

@account.route('/save/queue', methods=['POST'])
def save_all_locations():
    data = request.data
    dataDict = json.loads(data)
    uid = request.headers.get('uid', None)
    gid = request.headers.get('gid', None)

    account_locations = dataDict['locations']
    locations_obj = dataDict['locations_obj']
    for path in account_locations:
        save_insight_task.delay(account_location=path, uid=uid, gid=gid, locations_obj=locations_obj, aggregate='SOME_TEXT')

【问题讨论】:

    标签: python mongodb flask celery


    【解决方案1】:

    您应该连接到任务内的数据库。原因是子进程(由 Celery 创建)必须有自己的 mongo 客户端实例。

    更多详情here : Using PyMongo with Multiprocessing

    例如定义一个utils.py

    from pymodm import connect
    def mongo_connect():
        return connect("mongodb://{0}:{1}/{2}".format(MONGODB['host'],
                                        MONGODB['port'],
                                        MONGODB['db_name']),
                                        alias=MONGODB['db_name'])
    

    然后在 insight_tasks.py

    from utils import mongo_connect
    @celery_app.task
    def save_insight_task(account_location, uid, gid, locations_obj, aggregation):
        # connect to mongodb
        mongo_connect()
        # do your db operations
        try:
            insights, reviews = google_service.store_location_resources(
                gid, uid,
                start_datetime, end_datetime,
                account_location, aggregation
            )
        except StandardError as err:
            from pprint import pprint
            import traceback
    
            pprint(err)
            pprint(traceback.print_exc())
    
        path = 'saved_locations/{}/accounts/{}'.format(gid, account_location)
        location = [loc for loc in locations_obj if loc['name'] == 'accounts/' + account_location]
        if len(location) > 0:
            firebase_db.reference(path).update(location[0])
    

    请注意,我使用 pymodm 包而不是 mongoengine 包作为 mongo 的 ODM。

    【讨论】:

      猜你喜欢
      • 2015-12-05
      • 1970-01-01
      • 2019-05-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-04-15
      • 2017-06-28
      • 2017-02-02
      相关资源
      最近更新 更多