【问题标题】:Celery queue fetch multiple messages at the same time芹菜队列同时获取多条消息
【发布时间】:2021-09-15 20:13:42
【问题描述】:

我正在开发一个基于 docker 的 Celery 支持的 Python 应用程序,其中一个功能是触发和发送给定数字的文本消息。工作流程如下:

  1. 用户上传包含一组条目的 CSV,文本消息应发送到这些条目
  2. cron 作业每 60 秒轮询一次数据库以获取任何新条目并将它们添加到队列中
  3. 如果发现新条目,将它们放入队列并触发短信

目前,如果我上传一个包含 3 个条目的 CSV 文件,每个短信操作都会按顺序触发,而不是并行触发(默认 celery 进程行为)。例如,如果调度程序每 10 秒从队列中获取一个作业,则发送 3 条文本消息所用的时间将是 30 秒。 由于这些作业是相互独立的,我想将其并行化,以便同时发送所有三个文本消息。

我已尝试增加队列的并发性,但假设每个线程将被分配三个消息之一,但它不起作用。恐怕我可能缺少一些东西。 是否需要添加一些其他配置才能并行化作业?

运行芹菜队列的命令

celery worker --app=worker.app --concurrency=5 --hostname=worker1@%h --loglevel=INFO-Q queue1 -Ofair

芹菜配置


app = Celery(
    'worker',
    broker=os.environ['CELERY_BROKER'],
    backend=os.environ['RABBITMQ_BACKEND'],
    include=['worker.tasks','worker.schedule']
)



app.conf.update(
    result_expires=3600,
    task_track_started=True,
    worker_prefetch_multiplier = 5
)

app.conf.beat_schedule = {
    "get-message": {
        "task": "worker.schedule.get_new_messages",
        "schedule": 10,
        'options': {'queue' : 'queue1'}
    }
}

【问题讨论】:

    标签: python docker rabbitmq celery


    【解决方案1】:

    你可以使用 celery 中的组

    组用于并行执行任务。 group 函数接受签名列表。

    可能有助于参考,请参阅下面的文档

    https://sayari3.com/articles/18-chains-groups-and-chords-in-celery/

    【讨论】:

      【解决方案2】:

      解决此问题的最佳方法是修复 cron 作业。在您的 cron 作业中,不是将消息放入队列,而是调用您的 celery 任务。目前按顺序处理消息的原因取决于您对worker.schedule.get_new_messages 的实现。最有可能的是,该函数一次从队列中拉出多条消息,而处理这些消息的函数一次只处理一条。

      解决这个问题的方法是创建一个发送一条消息的任务,仅此而已。例如:

      @app.task('send_my_cool_message')
      def send_sms_message(from_, to_, text):
          twilio_client = Client(settings.ACCOUNT_SID, settings.AUTH_TOKEN)
          twilio_client.messages.create(to=to_, from=from_, body=text)
      

      现在在您的 cron 作业中,您为每条消息调用一个 celery 任务:

      from celery import Celery
      qs = Messages.objects.filter(created_at__gte=last_date_polled)
      app = Celery(broker=settings.BROKER_URL, backend=settings.BACKEND_URL)
      for message in qs:
          app.send_task('send_my_cool_message', kwargs={
              'from_': message.from_,
              'to_': message.to_,
              'text': message.text,
          })
      

      【讨论】: