【问题标题】:Comsuming MassTransit from Python or other languages从 Python 或其他语言使用 Mass Transit
【发布时间】:2014-01-27 15:44:06
【问题描述】:

我在 MassTransit 中完成了一个简单的发布者。我在一个间隔内发送消息,并且能够使用 MassTransit 从 .NET 客户端接收它。但是当我尝试从 Python 中观察某些东西时,它是沉默的。有没有办法从 Python 或其他语言使用 MassTransit?示例赞赏。

出版商:

builder.Register(c => ServiceBusFactory.New(sbc => {
    sbc.UseRabbitMq();
    sbc.UseBsonSerializer();
    sbc.UseLog4Net();

    sbc.ReceiveFrom("rabbitmq://localhost/masstransit");
});

.NET 客户端:

public void Execute(IJobExecutionContext context) {
   using (var scope = ServiceLocator.Current.GetInstance<ILifetimeScope>().BeginLifetimeScope()) {
       var log = scope.Resolve<ILog>();
       log.Debug("Sending queue message");

       var bus = scope.Resolve<IServiceBus>();
       bus.Publish(new SimpleTextMessage{Text = "some text"});
   }
}

Python 客户端:

import pika
print('Stating consumer')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='python_consumer_1')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback, queue='python_consumer_1')
channel.start_consuming()

来自 C# 应用程序的跟踪:

Configuration Result:
[Success] Name MyApp
[Success] ServiceName MyApp
Topshelf v3.1.122.0, .NET Framework v4.0.30319.34003
INFO (MassTransit.BusConfigurators.ServiceBusConfiguratorImpl) 209  - MassTransit v2.9.2/v2.9.0.0, .NET Framework v4.0.30319.34003
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 245  - CreatingRabbitMQ connection: rabbitmq://localhost/groups_error
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 246  - Using default configurator for connection: rabbitmq://localhost/groups_error
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 251  - RabbitMQconnection created: localhost:5672//
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 921  - Creating RabbitMQ connection: rabbitmq://localhost/groups
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 922  - Using default configurator for connection: rabbitmq://localhost/groups
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 924  - RabbitMQconnection created: localhost:5672//
DEBUG(MassTransit.ServiceContainer) 1056 - Starting bus service: MassTransit.Subscriptions.Coordinator.SubscriptionRouterService
DEBUG(MassTransit.ServiceContainer) 1062 - Starting bus service: MassTransit.Subscriptions.SubscriptionBusService
DEBUG(MassTransit.Threading.ThreadPoolConsumerPool) 1080 - Starting Consumer Pool for rabbitmq://localhost/groups
[Topshelf.Quartz] Scheduled Job: DEFAULT.ea637337-950a-4281-99c0-f10b842814c9
[Topshelf.Quartz] Job Schedule: Trigger 'DEFAULT.8a1d0b7c-d670-440b-974f-31ec8be6f294':  triggerClass: 'Quartz.Impl.Triggers.SimpleTriggerImpl calendar: '' misfireInstruction: 0 nextFireTime: 01/28/2014 07:12:35 +00:00 - Next Fire Time (local): 28.01.2014 9:12:35 +02:00
[Topshelf.Quartz] Scheduler started...
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1248 - CreatingRabbitMQ connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage_error
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1250 - Using default configurator for connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage_error
DEBUG(Global) 1254 - Sending queue message
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1254 - RabbitMQconnection created: localhost:5672//
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1272 - CreatingRabbitMQ connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1273 - Using default configurator for connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1277 - RabbitMQconnection created: localhost:5672//
DEBUG(MassTransit.Messages) 1439 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-961c-08d0ea0f744a:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(MassTransit.Messages) 1441 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-8965-08d0ea0f744b:MyApp.Transit.SimpleTextMessage, MyApp
The MyApp service is now running, press Control+C to exit.

DEBUG(Global) 21212 - Sending queue message
DEBUG(MassTransit.Messages) 21214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-d4fb-08d0ea0f77c4:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 41213 - Sending queue message
DEBUG(MassTransit.Messages) 41214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-077b-08d0ea0f7b40:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 61212 - Sending queue message
DEBUG(MassTransit.Messages) 61214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-2bed-08d0ea0f7ebb:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 81213 - Sending queue message
DEBUG(MassTransit.Messages) 81215 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-5f44-08d0ea0f8236:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 101212 - Sending queue message
DEBUG(MassTransit.Messages) 101214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-80f8-08d0ea0f85b1:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 121212 - Sending queue message
DEBUG(MassTransit.Messages) 121213 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-a971-08d0ea0f892c:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 141212 - Sending queue message
DEBUG(MassTransit.Messages) 141214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-d53e-08d0ea0f8ca7:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 161212 - Sending queue message
DEBUG(MassTransit.Messages) 172109 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-7504-08d0ea0f9208:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 181212 - Sending queue message
DEBUG(MassTransit.Messages) 193461 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-dd26-08d0ea0f95bf:MyApp.Transit.SimpleTextMessage, MyApp

【问题讨论】:

    标签: c# python .net rabbitmq masstransit


    【解决方案1】:

    看来,最简单的方法是将 python 队列绑定到 RabbitMq 管理中的交换。完成后,我已成功收到消息。

    PyhonConsumer 现在看起来如下:

    import pika
    
    print('Stating consumer')
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare('python_consumer_1')
    
    print ' [*] Waiting for messages. To exit press CTRL+C'
    
    def callback(ch, method, properties, body):
        print " [x] Received %r" % (body,)
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.queue_bind(queue='python_consumer_1', exchange='MyApp.Transit:SimpleTextMessage')
    channel.basic_consume(callback, queue='python_consumer_1')
    channel.start_consuming()
    

    【讨论】:

    • 是的,我已经设法提供了一个 PoC。感谢您提供想法。
    【解决方案2】:

    如果您要使用其他语言的消息,则需要查看 MassTransit 发布消息时如何创建交换。然后,您需要将这些交换绑定到您的队列,以便将消息传递给您的订阅者。

    对于你的 Python 代码,你需要

    exchange_bind(".....:SimpleTextMessage", "phython_consumer_1")
    

    完成此操作后,消息将被传递到您的队列。您正在使用 BSON,为什么不使用 JSON 或 python 可以轻松使用的东西?老实说,我不确定 Python 是否支持 BSON,只是想提供其他建议。

    【讨论】:

    • 我收到一个错误:ChannelClosed: (404, "NOT_FOUND - no exchange 'python_consumer_1' in vhost '/'") 尝试调用时:channel.exchange_bind('MyApp.Transit:SimpleTextMessage', 'python_consumer_1') 在 basic_consume 之前或之后.. 将使用来自 masstrasit 的跟踪更新问题。
    • 我还删除了 BSON 序列化程序并尝试将其替换为 JSON,但 python 部分仍然无声且无法获取消息。
    【解决方案3】:

    您可以使用 rabbitmqctl 管理实用程序检查现有的 rabbitmq 交换、队列,然后使用它的结果进行播放。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-09-08
      • 1970-01-01
      • 2012-02-21
      • 2016-03-20
      相关资源
      最近更新 更多