【问题标题】:Unit testing with django-celery?用 django-celery 进行单元测试?
【发布时间】:2025-12-15 14:55:01
【问题描述】:

我正在尝试为我们的django-celery 项目提出一种测试方法。我已经阅读了documentation 中的注释,但它并没有让我很好地了解实际应该做什么。我不担心测试实际守护进程中的任务,只担心 my 代码的功能。主要是我想知道:

  1. 我们如何在测试期间绕过task.delay()(我尝试设置CELERY_ALWAYS_EAGER = True,但没有任何区别)?
  2. 我们如何在不实际更改 settings.py 的情况下使用推荐的测试设置(如果这是最好的方法)?
  3. 我们还可以使用manage.py test 还是必须使用自定义跑步者?

总的来说,任何用 celery 进行测试的提示或技巧都会非常有帮助。

【问题讨论】:

  • 你是什么意思CELERY_ALWAYS_EAGER 没有区别?
  • 我仍然收到有关无法联系rabbitmq的错误。
  • 你有追溯吗?我猜.delay 以外的其他东西可能正在尝试建立连接。
  • 设置 BROKER_BACKEND=memory 在这种情况下会有所帮助。
  • 问你是对的。 BROKER_BACKEND=memory 已修复。如果您将其作为答案,我会将其标记为正确。

标签: python django unit-testing celery


【解决方案1】:

我喜欢在需要完成 celery 结果的测试中使用 override_settings 装饰器。

from django.test import TestCase
from django.test.utils import override_settings
from myapp.tasks import mytask

class AddTestCase(TestCase):

    @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
                       CELERY_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')
    def test_mytask(self):
        result = mytask.delay()
        self.assertTrue(result.successful())

如果您想将此应用于所有测试,您可以使用http://docs.celeryproject.org/en/2.5/django/unit-testing.html 中描述的 celery 测试运行器,它基本上设置了这些相同的设置,除了 (BROKER_BACKEND = 'memory')。

在设置中:

TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'

查看 CeleryTestSuiteRunner 的源代码,很清楚发生了什么。

【讨论】:

  • 这不适用于 celery 4,即使字段重命名为 here
  • 适用于 Celery 3.1。我只是让我的 Celery 测试用例从带有这个装饰器的父类继承。这样它只需要在一个地方,不需要拉入djcelery
  • 这在 Celery 4.4 上效果很好。和 Django 2.2。迄今为止我遇到的运行单元测试的最佳方法。
【解决方案2】:

尝试设置:

BROKER_BACKEND = 'memory'

(感谢asksol的评论。)

【讨论】:

【解决方案3】:

这是我的测试基类的摘录,它剔除了 apply_async 方法并记录对它的调用(包括 Task.delay。)这有点恶心,但在过去的几次中它设法满足了我的需求我已经用了几个月了。

from django.test import TestCase
from celery.task.base import Task
# For recent versions, Task has been moved to celery.task.app:
# from celery.app.task import Task
# See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

class CeleryTestCaseBase(TestCase):

    def setUp(self):
        super(CeleryTestCaseBase, self).setUp()
        self.applied_tasks = []

        self.task_apply_async_orig = Task.apply_async

        @classmethod
        def new_apply_async(task_class, args=None, kwargs=None, **options):
            self.handle_apply_async(task_class, args, kwargs, **options)

        # monkey patch the regular apply_sync with our method
        Task.apply_async = new_apply_async

    def tearDown(self):
        super(CeleryTestCaseBase, self).tearDown()

        # Reset the monkey patch to the original method
        Task.apply_async = self.task_apply_async_orig

    def handle_apply_async(self, task_class, args=None, kwargs=None, **options):
        self.applied_tasks.append((task_class, tuple(args), kwargs))

    def assert_task_sent(self, task_class, *args, **kwargs):
        was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2]
                       for task in self.applied_tasks)
        self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args))

    def assert_task_not_sent(self, task_class):
        was_sent = any(task_class == task[0] for task in self.applied_tasks)
        self.assertFalse(was_sent, 'Task was not expected to be called, but was.  Applied tasks: %s' %                 self.applied_tasks)

这里有一个“不经意间”的示例,说明您将如何在测试用例中使用它:

mymodule.py

from my_tasks import SomeTask

def run_some_task(should_run):
    if should_run:
        SomeTask.delay(1, some_kwarg=2)

test_mymodule.py

class RunSomeTaskTest(CeleryTestCaseBase):
    def test_should_run(self):
        run_some_task(should_run=True)
        self.assert_task_sent(SomeTask, 1, some_kwarg=2)

    def test_should_not_run(self):
        run_some_task(should_run=False)
        self.assert_task_not_sent(SomeTask)

【讨论】:

    【解决方案4】:

    因为我仍然在搜索结果中看到这个,设置覆盖

    TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
    

    按照Celery Docs为我工作

    【讨论】:

      【解决方案5】:

      对于 2019 年来到这里的每个人:结帐this article 涵盖不同的策略,包括同步调用任务。

      【讨论】:

        【解决方案6】:

        这就是我所做的

        在 myapp.tasks.py 我有:

        from celery import shared_task
        
        @shared_task()
        def add(a, b):
            return a + b
        

        在 myapp.test_tasks.py 我有:

        from django.test import TestCase, override_settings
        from myapp.tasks import add
        
        
        class TasksTestCase(TestCase):
        
            def setUp(self):
                ...
        
            @override_settings(CELERY_TASK_ALWAYS_EAGER=True,CELERY_TASK_EAGER_PROPOGATES=True)
            def test_create_sections(self):
                result= add.delay(1,2)
                assert result.successful() == True
                assert result.get() == 3
        

        【讨论】: