【问题标题】:Store messages to variable RabbitMQ Java将消息存储到变量 RabbitMQ Java
【发布时间】:2018-09-01 16:47:14
【问题描述】:

这是我在 rmq 中接收消息的接收函数。这里有更多解释:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

此代码显示我的消息,但我想将它们存储在变量中。

问题是handleDelivery 无效。当我将 void 更改为 String 我得到:

“返回类型与DefaultConsumer.handleDelivery(String, Envelope, AMQP.BasicProperties, byte[])不兼容”

关于如何将消息存储在变量中的任何想法?

        public String recv() throws Exception
        {
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            Consumer consumer = new DefaultConsumer(channelRecv) 
              {
                 @Override
                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 
                 throws IOException 
              {
                String message = new String(body, "UTF-8");  
                System.out.println(" [x] Received '" + message + "'");
                return message;
              }      

            };
            channelRecv.basicConsume(queRecv, true, consumer);

        }

编辑:这是我运行主程序时遇到的错误

The error is: Exception in thread "main" java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1255)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:471)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:461)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:456)
at Recv.recv(Recv.java:44)
at mainLaptop.main(mainLaptop.java:11)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'Leonardo' in vhost '/', class-id=60, method-id=20)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366)
at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1253)
... 5 more

这是我的代码

    public class Recv 
    {

public static String recv(String ip, String Q) throws Exception 
{

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(ip);
    factory.setUsername("test");
    factory.setPassword("test");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    MyConsumer consumer=new MyConsumer(channel);
    channel.basicConsume(Q,true,consumer);

    return consumer.getStoredMessage();
}

public static class MyConsumer extends DefaultConsumer 
{
    private String storedMessage;

    public MyConsumer(Channel channel) 
    {
        super(channel);
    }

    public String getStoredMessage() 
    {
        return storedMessage;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
        throws IOException 
    {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
        storedMessage = message; // store message here
    }
}
}

【问题讨论】:

    标签: java windows rabbitmq queue message-queue


    【解决方案1】:

    您可以创建 DefaultConsumer 类的自定义扩展,它可以设置/获取结果

    public String recv() throws Exception {
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        MyConsumer consumer=new MyConsumer(channelRecv);
        channelRecv.basicConsume(queRecv,true,consumer);
    
        consumer.getStoredMessage(); // use stored value here
    }
    
    public class MyConsumer extends DefaultConsumer {
        private String storedMessage;
    
        public MyConsumer(Object channelRecv) {
            super(channelRecv);
        }
    
        public String getStoredMessage() {
            return storedMessage;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            storedMessage = message; // store message here
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2017-11-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-29
      • 1970-01-01
      • 1970-01-01
      • 2016-12-15
      • 2015-11-01
      相关资源
      最近更新 更多