【问题标题】:ActiveMQ Spring Stomp: how can i change my existing code to create persistent subscriptionActiveMQ Spring Stomp:如何更改现有代码以创建持久订阅
【发布时间】:2017-06-09 05:27:10
【问题描述】:

我在我的项目中创建了一个正在运行的通知系统。我的实际代码是:

我的客户(javascript):

let connectWebSocket = () => {
  socket = new SockJS(context.backend + '/myWebSocketEndPoint');
  stompClient = Stomp.over(socket);
  stompClient.connect({},function (frame) {
    stompClient.subscribe('/topic/notification', function(response){
      alert(response);
    });
  });
}
connectWebSocket();

服务器(Java 和 Spring)

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableSimpleBroker("/topic");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/myWebSocketEndPoint")
            .setAllowedOrigins("*")
            .withSockJS();
    }
}

这是有效的。现在我想在用户离线时也向他们发送通知:当他们登录时,我会(自动)向他们发送通知。我必须用 activeMQ 来做这件事。我看过一些例子,但不太了解它们..有人可以告诉我如何准确编辑我的代码并实现持久订阅?非常感谢

编辑:我已经更新了我的客户端代码:

let connectWebSocket = () => {
  let clientId =user.profile.id;
  socket = new SockJS(context.backend + '/myWebSocketEndPoint');
  stompClient = Stomp.over(socket);
  stompClient.connect({"client-id": clientId},{},function (frame) {
    stompClient.subscribe('/topic/notification', function(response){
      alert(response);
    },{"activemq.subscriptionName": clientId});
  });
}

但是当用户离线时,如果通知到达,当他在线返回时,通知不会发送给他..我想我必须改变我的服务器端

POM.xml

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.14.2</version>
</dependency>

EDIT2:: 在 pom.xml 中使用正确的依赖项,我现在有一个错误。我有这个配置:

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
   config.enableStompBrokerRelay("/topic/");
}

但是当我运行我的代码时,我看到了这个错误:

2017/01/24 17:17:15.751 ERROR [org.springframework.boot.SpringApplication:839] Application startup failed
org.springframework.context.ApplicationContextException: Failed to start bean 'stompBrokerRelayMessageHandler'; nested exception is java.lang.NoClassDefFoundError: reactor/io/codec/Codec

EDIT3:这是我向客户发送通知的方式:

@Component
public class MenuItemNotificationSender {

@Autowired
private SimpMessagingTemplate messagingTemplate;

@Autowired
public MenuItemNotificationSender(SimpMessagingTemplate messagingTemplate){
    this.messagingTemplate = messagingTemplate;
}

public void sendNotification(MenuItemDto menuItem) {
    messagingTemplate.convertAndSend("/topic/notification", menuItem);
}
}

【问题讨论】:

    标签: spring activemq stomp


    【解决方案1】:

    如果您使用默认 AMQ 配置,这是持久订阅者的默认行为,消息将被持久化, 如果您想在用户离线时也向他们发送通知,您需要使用持久订阅。

    编辑

    STOMP 中的持久消息 STOMP 消息是非持久的 默认。要使用持久消息传递,请将以下 STOMP 标头添加到 所有发送请求:持久性:真。此默认值与 用于 JMS 消息。

    要持久化已发送的消息,在 js 客户端上,您需要在此方法中添加标头:

    stompClient.send(destination,  {"persistent":"true" }, body);
    

    像这样更新您的 MenuItemNotificationSender:

    public void sendNotification(MenuItemDto menuItem) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("JMSDeliveryMode", 2);
        headers.put("persistent", "true");
        messagingTemplate.convertAndSend("/topic/notification", menuItem, headers);
    }
    

    看看

    http://activemq.apache.org/how-do-i-make-messages-durable.html

    http://activemq.apache.org/how-do-durable-queues-and-topics-work.html

    使用 stomp 进行持久订阅:

        stompClient.connect( {"client-id": "my-client-id" },, function ( frame ) {
    
          console.log( 'Connected: ' + frame );
    
          stompClient.subscribe( topic, function ( message ) {
            .....
            .....
          }, {"activemq.subscriptionName": "my-client-id"});
       }, function(frame) {
            console.log("Web socket disconnected");
       });
    

    更新

    @Configuration
    @EnableWebSocketMessageBroker
    public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{
    
    
    
    @Bean(initMethod = "start", destroyMethod = "stop")
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();
        //broker.addConnector("tcp://localhost:61616");
        broker.addConnector("stomp://localhost:61613");
        broker.addConnector("vm://localhost");
        PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
        File dir = new File(System.getProperty("user.home") + File.separator + "kaha");
        if (!dir.exists()) {
            dir.mkdirs();
        }
        persistenceAdapter.setDirectory(dir);
        broker.setPersistenceAdapter(persistenceAdapter);
        broker.setPersistent(true);
        return broker;
    }
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // if AMQ is running in local not needed to set relayHost & relayPort
        config.enableStompBrokerRelay("/topic/")
       .setRelayHost(relayHost)
       .setRelayPort(relayPort)
       // user pwd if needed
       //.setSystemLogin(activeMqLogin)
       //.setSystemPasscode(activeMqPassword)
       ;
    }
    
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/myWebSocketEndPoint")
                .setAllowedOrigins("*")
                .withSockJS();
        }
    }
    

    使用父 pom

    <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>1.4.3.RELEASE</version>
      <relativePath /> <!-- lookup parent from repository -->
    </parent>
    
    
    
    
    
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-net</artifactId>
    </dependency>
    <dependency>
      <groupId>io.projectreactor.spring</groupId>
      <artifactId>reactor-spring-context</artifactId>
    </dependency>
    
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-kahadb-store</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-stomp</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    

    【讨论】:

      猜你喜欢
      • 2016-11-09
      • 2016-01-14
      • 2015-08-24
      • 1970-01-01
      • 2018-01-15
      • 2017-10-31
      • 2019-09-30
      • 2017-01-24
      • 2014-07-05
      相关资源
      最近更新 更多