【问题标题】:wso2 message broker client for pythonpython的wso2消息代理客户端
【发布时间】:2018-01-11 23:14:30
【问题描述】:

我想使用 python 客户端消费和发布消息到 wso2 消息代理。找了很多,没找到专门为wso2消息代理设计的python客户端。

虽然我开始知道适用于 rabbitmq 的 pika 库可以适用于 wso2 消息代理。

所以我写了一个代码来发布消息到 wso2 队列。我在 wso2 消息代理上创建了一个测试队列,并尝试使用 pika 库发布消息。

import pika

params = pika.URLParameters("amqp://admin:admin@localhost:5672/%2F")
connection = pika.BlockingConnection(params)
channel = connection.channel()
# channel.queue_declare(queue="testqueue", durable=True, exclusive=False, auto_delete=False)
if channel.basic_publish(exchange='', routing_key='testqueue',
                         body='New message for testing',
                         properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1),
                         mandatory=True):
    print(" Message was published sucessfully")
else:
    print("message could not be published")

显示消息已发布但未发布。但是在 wso2 消息代理中,我收到控制台错误。

[ Sequence: 24976 ] Exception occurred while processing inbound events.Event type: MESSAGE_EVENT
java.lang.NullPointerException
    at java.util.HashSet.<init>(HashSet.java:118)
    at org.wso2.andes.kernel.router.QueueMessageRouter.getMatchingStorageQueues(QueueMessageRouter.java:88)
    at org.wso2.andes.kernel.disruptor.inbound.MessagePreProcessor.preProcessIncomingMessage(MessagePreProcessor.java:214)
    at org.wso2.andes.kernel.disruptor.inbound.MessagePreProcessor.updateRoutingInformation(MessagePreProcessor.java:190)
    at org.wso2.andes.kernel.disruptor.inbound.MessagePreProcessor.onEvent(MessagePreProcessor.java:75)
    at org.wso2.andes.kernel.disruptor.inbound.MessagePreProcessor.onEvent(MessagePreProcessor.java:49)
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

    标签: python wso2 messagebroker


    【解决方案1】:

    以上代码缺少交换(变为空白),而是添加了amq.direct 以使其正常工作。

    channel.publish(exchange='amq.direct', 
            routing_key='testqueue', 
            body='Hello World!',
            properties=pika.BasicProperties(content_type='text/plain', delivery_mode=1)
    )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-11-23
      • 1970-01-01
      • 1970-01-01
      • 2015-07-17
      • 2016-03-07
      • 2019-04-08
      • 1970-01-01
      相关资源
      最近更新 更多