【问题标题】:How to replace celery task with azure service bus in a django application?如何在 django 应用程序中用 azure 服务总线替换 celery 任务?
【发布时间】:2020-12-20 14:39:16
【问题描述】:

我被要求在 Django 应用程序中使用 azure 服务总线而不是 celery。

阅读提供的文档,但没有清楚地了解使用服务总线而不是 celery 任务。提供的任何建议都会有很大帮助。

【问题讨论】:

  • 只是为了重新解释 - 你正在使用 celery 任务执行什么操作?
  • @sathya_vijayakumar-MSFT 我正在使用 celery 运行异步任务,并且运行良好。突然间,我被要求使用服务总线。不知道如何使用服务总线而不是芹菜。
  • 你的芹菜会在特定条件下触发吗?
  • 是的,它被触发了。如果可能的话,我希望服务总线也能发生同样的事情。

标签: python django celery azure-servicebus-queues


【解决方案1】:

在进入之前,我想强调一下 Azure Service Bus 和 Celery 之间的区别。

Azure 服务总线:

Microsoft Azure 服务总线是一个完全托管的企业集成消息代理。

您可以参考this 了解更多关于服务总线的信息

芹菜:

分布式任务队列。 Celery 是一个基于分布式消息传递的异步任务队列/作业队列。

在您的情况下,我可以想到 2 种可能性:

  1. 您想使用带有 Celery 的 Service Bus 来代替其他 消息代理。
  2. 用服务总线替换 Celery

1:您希望使用带有 Celery 的 Service Bus 来代替其他消息代理。

您可以将此提交给understand why celery needs a message broker。 我不确定您当前使用的是哪个消息传递代理,但您可以使用 Kombu library 来满足您的要求。

Azure 服务总线参考: https://docs.celeryproject.org/projects/kombu/en/stable/reference/kombu.transport.azureservicebus.html

供他人参考: https://docs.celeryproject.org/projects/kombu/en/stable/reference/index.html

2 : 完全用服务总线替换 Celery 为了满足您的要求:

考虑

  • 消息发送者是生产者
  • 消息接收者是消费者

这是您必须处理的两个不同的应用程序。

您可以参考以下内容以获取更多示例代码以进行构建。

https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples

说明:

  • 每次你想执行动作时,你可以发送 从生产者客户端向主题发送消息。
  • 消费者客户端 - 正在侦听的应用程序将接收消息并进行相同处理。您可以将自定义流程附加到它 - 这样,只要在消费者客户端收到消息,您的自定义流程就会执行。

以下是接收客户端的示例:

from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()

        
Receiving = True

#Topic 1 receiver : 
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver =  SubsClient.get_receiver()

async def receive_message_from1():
    await receiver.open()
    print("Opening the Receiver for Topic1")
    async with receiver:
      while(Receiving):
        msgs =  await receiver.fetch_next()
        for m in msgs:
            print("Received the message from topic 1.....")
            ##### - Your code to execute when a message is received - ########
            print(str(m))
            ##### - Your code to execute when a message is received - ########
            await m.complete()
            
            
loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())

下一行之间的部分将是每次收到消息时都会执行的指令。

##### - Your code to execute when a message is received - ########

【讨论】:

    猜你喜欢
    • 2019-01-10
    • 2012-08-09
    • 1970-01-01
    • 1970-01-01
    • 2017-02-02
    • 1970-01-01
    • 2017-01-25
    • 2017-05-07
    • 1970-01-01
    相关资源
    最近更新 更多