【问题标题】:Continously running Celery Task in Django在 Django 中持续运行 Celery 任务
【发布时间】:2020-06-30 07:48:10
【问题描述】:

我有一个 Django 应用程序,它应该不断地监听来自 Kafka 的消息,然后通过 WebSocket 将它们发送到客户端。问题是如何设置常量监听器。为了未来的可扩展性,我们决定将 Celery 引入项目中,以通过扩展来管理这些问题。

我的任务实际上是这样的:

class ConsumerTask(Task):
    name = 'consume_messages'

    def run(self, *args, **kwargs):
        consumer = get_kafka_consumer(settings.KAFKA_URL,
                                      settings.FAULT_MESSAGES_KAFKA_TOPIC,
                                      'consumer_messages_group')
        logger.info("Kafka's consumer has been started")

        while True:
            messages = consumer.poll()
            for _, messages in messages.items():
                messages, messages_count = self.get_message(messages)
                if messages_count > 0:
                    messages = save_to_db()
                    send_via_websocket_messages(messages)

它通过WS正确保存和发送消息,但问题来自任务中的无限循环。 出于某种原因(可能是任务超时限制),任务会在队列中弹出并且不再运行。 我不确定 celery 工人的守护进程能否解决这个问题。 您能否提供一些策略来组织这个过程的“持续运行部分”?

【问题讨论】:

    标签: django apache-kafka celery celeryd


    【解决方案1】:

    您的用例适合 celery 任务。 Celery 任务不应该是长时间运行的进程。您需要将任务放入代理​​队列中,这在您的设置中也没有任何意义。

    想想你的while-True-loop,而不是一个celery worker。工人应该不断运行,这也是您在处理更多任务时需要扩展的过程。

    编写一个Django management command,使用您的 while-True 循环,并使用您将用于扩展 celery 工作人员的扩展类型来运行该管理命令的多个实例。

    使用流程管理工具来扩展流程,例如 honchosupervisord

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-03-17
      • 2023-03-24
      • 1970-01-01
      • 2022-11-14
      • 1970-01-01
      • 2013-12-16
      • 2020-03-20
      • 2011-04-04
      相关资源
      最近更新 更多