【问题标题】:spring boot and BlockingQueue listenerspring boot 和 BlockingQueue 监听器
【发布时间】:2018-08-22 18:13:34
【问题描述】:

我已经用spring boot实现了jms,我正在使用@JmsListener来监听主题

  @Component
    public class AMQListner {
        BlockingQueue<MessageBO> queue = new ArrayBlockingQueue<>(1024);
        @JmsListener(destination = "${spring.activemq.topic}")
        public void Consume(TextMessage message) {
            try {
                String json = message.getText();
                MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
                queue.add(bo);
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (JsonParseException e) {
                e.printStackTrace();
            } catch (JsonMappingException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

现在我想要一个侦听器来侦听该阻塞队列,如果它具有 value ,则处理。我们可以在 Spring Boot 中使用注释来实现这一点吗?

【问题讨论】:

  • 您是否特别需要 Spring-Boot 解决方案?纯 Java 的实现看起来很简单,我不知道 Spring Boot 会如何简化它。
  • @daniu 首先寻找弹簧特定的解决方案。否则需要使用与spring boot有任何冲突的纯java实现。

标签: java spring spring-boot activemq listener


【解决方案1】:

首先,正确的方法是创建一个处理程序 bean,而不是在接收器类中拥有消息队列的成员。

public interface MessageHandler extends Consumer<MessageBO> {
    public default void handle(MessageBO msg) { accept(msg); }
}

@Component
public class AMQListener {
    @Resource("multiplexer")
    MessageHandler handler;

    @JmsListener(destination = "${spring.activemq.topic}")
    public void Consume(TextMessage message) {
        try {
            String json = message.getText();
            MessageBO bo = ObjectMapperConfig.getInstance().readValue(json, MessageBO.class);
            handler.handle(bo);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

然后,您将在处理程序 bean 中拥有队列

@Component("multiplexer")
public class MessageMultiplexer implements MessageHandler {
    @Autowired
    MessageHandler actualConsumer;

    ExecutorService executor = Executors.newFixedThreadPool(4);
    public void accept(MessageBO msg) {
        executor.submit(msg -> actualConsumer.handle(msg));
    }
}

在这种情况下,Executor 几乎就是队列。

警告:您没有这种方式的 1024 限制。您可以通过使用ThreadPoolExecutor 构造函数并传递给它一个有限的队列来做到这一点。

【讨论】:

  • executor.submit(msg -&gt; actualConsumer.handle(msg)); 会解析 lambda 表达式吗?
  • @ShahidGhafoor 不确定您的意思;这是一个Runnable,当执行程序有一个线程留给它时运行。在本例中,将有四个线程同时处理消息;你可以使用Executors.newSingleThreadPool()来一一处理。
  • @ShahidGhafoor ... 是的 lambda 表达式 msg -&gt; actualConsumer.handle(msg) 将得到解决。
猜你喜欢
  • 1970-01-01
  • 2015-04-18
  • 2021-10-15
  • 1970-01-01
  • 2021-03-11
  • 2020-10-28
  • 2020-12-23
  • 2017-01-23
  • 1970-01-01
相关资源
最近更新 更多