【问题标题】:How to use celery for inserting data to mongodb using mongoengine如何使用 celery 使用 mongoengine 向 mongodb 插入数据
【发布时间】:2026-01-31 21:55:01
【问题描述】:

我正在尝试使用 celery 在我的 mongodb 中插入大数据,但问题是 mongodb 中的并发性。如果我一次向 celery 发送多个任务,则部分数据将插入 mongodb,而其他一些数据则不会。我认为这是因为 mongodb 在插入操作时锁定了数据库,但我需要一个能够发送多个相同类型的任务以在数据库中插入数据的解决方案。就像在等待它解锁时检查数据库是否被锁定。这是我的代码的一部分:

@celery.task(name='celery_tasks.add_book_product')
def add_book_product(product_dict, store_id):

    connect(DefaultConfig.MONGODB_DB, host=DefaultConfig.MONGODB_HOST)

    store_obj = Store.objects.get(pk=store_id)

    try:
        book = Books.objects.get(pk=product_dict['RawBook'])

        try:
            product_obj = Product.objects.get(store=store_obj, related_book=book, kind='book')
            print("Product {} found for store {}".format(product_obj.id, store_obj.id))
            product_obj.count = int(product_dict['count'])
            product_obj.buy_price = int(product_dict['buy_book'])
            product_obj.sell_price = int(product_dict['sell_book'])

            product_obj.save()

        except (DoesNotExist, ValidationError):
            product_obj = Product(store=store_obj,
                                  related_book=book,
                                  kind='book',
                                  count=int(product_dict['count']),
                                  buy_price=int(product_dict['buy_book']),
                                  sell_price=int(product_dict['sell_book']),
                                  name=book.name_fa)

            product_obj.save()

            print("Appending books to store obj...")
            store_obj.products.append(product_obj)
            store_obj.save()
            print("Appending books to store obj done")

        return "Product {} saved for store {}".format(product_obj.id, store_obj.id)
    except (DoesNotExist, ValidationError):
        traceback.print_exc()
        return "Product with raw book {} does not exist.".format(product_dict['RawBook'])

【问题讨论】:

    标签: python mongodb concurrency celery celery-task


    【解决方案1】:

    默认情况下,多处理用于在 celery 中执行并发执行任务。但是有两种方法可以确保在任何给定时间只执行一个任务。

    解决方案 1:

    当你开始使用 celery worker 时

    celery -A your_app worker -l info
    

    默认并发数等于您的机器拥有的内核数。所以如果你像这样开始一个工人

    celery -A your_app worker -l info -c 1
    

    它在任何给定时间只运行一项任务。如果您还有其他一些任务需要执行,您可以启动一个新队列并分配一个工作人员来执行。

    解决方案 2:

    这有点复杂。你需要在你的任务中使用锁,就像这样。

    if acquire_lock():
        try:
            #do something
        finally:
            release_lock()
        return 
    

    您可以在Celery documentation 中阅读更多相关信息。

    【讨论】:

      最近更新 更多