【问题标题】:Angular 5 Rxjs Subject.subscribe() not triggering in multiple componentsAngular 5 Rxjs Subject.subscribe() 未在多个组件中触发
【发布时间】:2018-05-13 19:08:54
【问题描述】:

我正在使用 rxjs 和主题来更新我的两个组件。

我正在订阅服务中的主题,但是当在主题上调用 .next 方法时,它只会更新我的一个组件。

该应用程序包含一个用于初始化 websocketconnection 的 WebsocketService,一个使用 WebsocketService 连接到后端并发送/接收通知的 NotificationService。

我有一个 NotificationComponent,我可以在其中创建一个新通知。在这个组件中,我订阅了 NotificationService 中的主题,并在通知更新时显示通知。这工作得很好,消息到达后端,并在当前有连接的所有浏览器中更新。

对我来说,下一步是在 HeaderComponent 中显示此通知。我在这里注入了 NotificationService 并订阅了相同的主题,但是当我发送通知时,HeaderComponents 订阅不会触发。 console.log 消息永远不会出现在控制台中。

WebSocket服务

import { Injectable } from '@angular/core';
import { ReplaySubject, Subject, Observable, Observer } from 'rxjs/Rx';

@Injectable()
export class WebsocketService {

  constructor() { }

  private subject: ReplaySubject<MessageEvent>;

  public connect(url): ReplaySubject<MessageEvent> {
    if (!this.subject) {
      this.subject = this.create(url);
      console.log("Successfully connected: " + url);
    }
    return this.subject;
  }

  private create(url): ReplaySubject<MessageEvent> {

    //create connection
    let ws = new WebSocket(url);

    //define observable
    let observable = Observable.create(
      (obs: Observer<MessageEvent>) => {
        ws.onmessage = obs.next.bind(obs);
        ws.onerror = obs.error.bind(obs);
        ws.onclose = obs.complete.bind(obs);
        return ws.close.bind(ws);
      });

    //define observer
    let observer = {
      next: (data: Object) => {
        if (ws.readyState === WebSocket.OPEN) {
          console.log("---sending ws message---");
          ws.send(JSON.stringify(data));
        }
      }
    };

    return ReplaySubject.create(observer, observable);
  }
}

通知服务

import { Injectable } from '@angular/core';
import { Observable, Subject, ReplaySubject, BehaviorSubject } from 'rxjs/Rx';
import { WebsocketService } from './websocket.service';
import { Notification } from './../model/notification'

const NOTIFICATION_URL = 'ws://localhost:8080/Kwetter/socket'


@Injectable()
export class NotificationService {

  public _notification: ReplaySubject<Notification>;

  constructor(websocketService: WebsocketService) {

    this._notification = <ReplaySubject<Notification>>websocketService
      .connect(NOTIFICATION_URL)
      .map((response: MessageEvent): Notification => {
        let data = JSON.parse(response.data);
        return {
          sender: data.author,
          message: data.message
        }
      });
  }

  sendMessage(notification) {
    console.log("---calling .next()---");
    this._notification.next(notification);
  }
}

通知组件

import { Component, OnInit } from '@angular/core';
import { NotificationService } from '../services/notification.service';
import { UserService } from '../services/user.service';
import { Notification } from './../model/notification';

@Component({
  selector: 'app-notifications',
  templateUrl: './notifications.component.html',
  styleUrls: ['./notifications.component.css']
})
export class NotificationsComponent implements OnInit {

  notification: Notification;
  text: string;

  constructor(private notificationService: NotificationService, private userService: UserService) {

    if (this.notification == null) {
      this.notification = new Notification("", "");
    }
    notificationService._notification.subscribe(notification => {
      console.log("---notification has been updated---")
      this.notification = notification;
    });
  }

  sendMsg() {
    let newNot = new Notification(this.userService.getUser(), this.text);
    this.notificationService.sendMessage(newNot);
  }

  ngOnInit() {
  }

}

标头组件

    import { Component, OnInit, OnDestroy } from '@angular/core';
import { UserService } from '../../services/user.service';
import { NotificationService } from '../../services/notification.service';
import { Router } from '@angular/router';
import { Subscription } from 'rxjs/Subscription';
import { Profile } from '../../model/profile';
import { User } from '../../model/user';
import { Notification } from '../../model/notification';

@Component({
  selector: 'app-header',
  templateUrl: './header.component.html',
  styleUrls: ['./header.component.css']
})
export class HeaderComponent implements OnInit, OnDestroy {

  private notification: Notification;
  private loggedIn = false;
  private user: User;

  private subscription: Subscription;

  constructor(private userService: UserService, private router: Router, private notificationService: NotificationService) {

    console.log("---constructor headercomponent---");
    console.log(this.notification);

    this.notificationService._notification.subscribe(notification => {
      console.log("---header notification has been updated---");
      this.notification = notification;
    });

    if (this.notification == null) {
      this.notification = new Notification("", "");
    }

    this.subscription = this.userService.profile$.subscribe(user => {
      this.user = user;
      if (user !== null) {
        this.loggedIn = true;
      }
      else this.loggedIn = false;
    });

    this.loggedIn = userService.isLoggedIn();
    this.user = userService.getUser();
  }

  logout() {
    this.userService.logout();
    this.router.navigate(['home']);
  }

  home() {
    this.router.navigate(['home']);
  }

  myProfile() {
    console.log("click");
    this.router.navigate(['profile', this.userService.getUser().id]);
  }

  getLoggedIn(): void {
    this.loggedIn = !!this.userService.isLoggedIn();
  }

  ngOnInit() {
    this.getLoggedIn();
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
  }

}

NotificationComponent 使用 router-outlet 显示,header 组件总是使用 selector 标签显示,但我认为这无关紧要。

<div>
  <app-header></app-header>
  <div class="content">
    <router-outlet></router-outlet>
  </div>
</div>

我发现下面的线程,建议使用 ReplaySubject 以防我在事件触发后订阅(我不认为是这种情况,但我还是尝试了)。这不起作用。

另外,我只有一个 app.module 用于声明提供程序。 由于我对两个组件使用相同的代码,为什么 .subscribe 只能在 NotificationComponent 中工作?

Angular 2: Observable / Subscription not triggering

【问题讨论】:

  • “我只有一​​个 app.module 我声明提供者”所以你没有路由模块?
  • 对不起,我不明白这个问题。我的意思是说我将服务传递给 app.module 中的 providers 数组。我只有一个应用程序模块。我不再在@Component 中的特定组件中提供服务。所以我的服务不应该被实例化两次。
  • “我只有一​​个应用模块”“我只有一​​个模块” 不同。如果例如您的路线是延迟加载的,那么您有多个模块。
  • 在这种情况下,我要导入几个模块:github.com/bvkeersop/KwetterFrontend/blob/master/src/app/… 但是我为 appmodule 提供服务,所以如果我理解正确,那就是在我的组件中注入的那个。

标签: javascript angular rxjs


【解决方案1】:

您看到的行为与 RxJS 的工作方式以及流的创建方式有关。一起来看看WebsocketService

let observable = Observable.create(
  (obs: Observer<MessageEvent>) => {
    ws.onmessage = obs.next.bind(obs);

obs 对于每个订阅都是新的,但ws 始终相同。因此,当您在 NotificationComponent 中第二次订阅时,onmessage 回调仅针对该订阅调用 next。因此只有该组件接收消息。

您可以通过在NotificationComponent 中注释掉notificationService._notification.subscribe 来验证这一点。然后HeaderComponent会收到消息。

一个简单的解决方案是在NotificationService 中添加share 运算符:

this._notification = <ReplaySubject<Notification>>websocketService
  .connect(NOTIFICATION_URL)
  .map((response: MessageEvent): Notification => {
    let data = JSON.parse(response.data);
    return {
      sender: data.author,
      message: data.message
    }
  })
.share();

这意味着.share()的订阅上游将被共享,即(obs: Observer<MessageEvent>) => { ws.onmessage = obs.next.bind(obs);只会被调用一次,两个组件都会收到消息。

顺便说一句:RxJs 提供support for websockets。您可以使用Observable.webSocket(url); 创建一个流并去掉一些代码。

【讨论】:

  • 完美的答案,很好的解释!谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-04-02
  • 1970-01-01
  • 2018-06-24
  • 1970-01-01
  • 1970-01-01
  • 2020-07-25
  • 2019-03-08
相关资源
最近更新 更多