【问题标题】:Angular2/Websocket: how to return an observable for incoming websocket messagesAngular2/Websocket:如何为传入的 websocket 消息返回一个 observable
【发布时间】:2016-08-19 11:31:18
【问题描述】:

我将使用 Angular2 接收 websocket 传入消息并根据收到的消息更新网页。现在,我正在使用一个虚拟的 echo websocket 服务,并将替换它。

据我了解,接收 websocket 消息的函数必须返回一个由处理程序订阅的可观察对象,该处理程序将更新网页。但我不知道如何返回一个 observable。

代码sn-p附在下面。 MonitorService 创建一个 websocket 连接并返回一个包含接收到的消息的 observable。

@Injectable()
export class MonitorService {

    private actionUrl: string;
    private headers: Headers;
    private websocket: any;
    private receivedMsg: any;
    constructor(private http: Http, private configuration: AppConfiguration) {

        this.actionUrl = configuration.BaseUrl + 'monitor/';
        this.headers = new Headers();
        this.headers.append('Content-Type', 'application/json');
        this.headers.append('Accept', 'application/json');
    }

    public GetInstanceStatus = (): Observable<Response> => {
        this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service
        this.websocket.onopen =  (evt) => {
            this.websocket.send("Hello World");
        };

        this.websocket.onmessage = (evt) => { 
            this.receivedMsg = evt;
        };

        return new Observable(this.receivedMsg).share();
    }

}

下面是另一个组件,它订阅从上面返回的 observable 并相应地更新网页。

export class InstanceListComponent {
  private instanceStatus: boolean
  private instanceName: string
  private instanceIcon: string
  constructor(private monitor: MonitorService) { 
    this.monitor.GetInstanceStatus().subscribe((result) => {
        this.setInstanceProperties(result);
    });
  }

  setInstanceProperties(res:any) {
    this.instanceName = res.Instance.toUpperCase();
    this.instanceStatus = res.Status;
    if (res.Status == true)
    {
      this.instanceIcon = "images/icon/healthy.svg#Layer_1";
    } else {
      this.instanceIcon = "images/icon/cancel.svg#cancel";
    }
  }
}

现在,我在浏览器控制台中遇到了这个错误 TypeError: this._subscribe is not a function

【问题讨论】:

    标签: websocket angular observable


    【解决方案1】:

    我把它放在plunker 上,并添加了一个用于向 Websocket 端点发送消息的函数。这是重要的编辑:

    public GetInstanceStatus(): Observable<any>{
        this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service
        this.websocket.onopen =  (evt) => {
            this.websocket.send("Hello World");
        };
        return Observable.create(observer=>{
            this.websocket.onmessage = (evt) => { 
                observer.next(evt);
            };
        })
        .share();
    }
    

    更新
    正如您在评论中提到的,更好的替代方法是使用Observable.fromEvent()

    websocket = new WebSocket("ws://echo.websocket.org/");
    public GetInstanceStatus(): Observable<Event>{
        return Observable.fromEvent(this.websocket,'message');
    }
    

    plunker exampleObservable.fromEvent()

    此外,您可以使用WebSocketSubject 来完成,不过,它看起来还没有准备好(从 rc.4 开始):

    constructor(){
      this.websocket = WebSocketSubject.create("ws://echo.websocket.org/");
    }
    
    public sendMessage(text:string){
      let msg = {msg:text};
      this.websocket.next(JSON.stringify(msg));
    } 
    

    plunker example

    【讨论】:

    • 谢谢!而且,事实证明,像这样返回 observable 也可以return Observable.fromEvent(websocket, 'message');你对它们的差异有什么想法吗?
    • @BingLu 他们几乎一样。 fromEvent 方式只是事件的包装,这就是我在回答中所做的。我怀疑有什么区别。但是,这仍然比我的答案更好:D。我会更新我的答案以包含它。谢谢
    • @Abdulrahman 我知道那是很久以前的事了,但我想知道您是否可以更新 Plunker 以使用 fromEvent。顺便说一句,WebSocketSubject 不应该用于流数据吗? (我只是想知道,我是 websockets 的新手)。
    • @drbishop 我更新了我的答案并为fromEventWebSocketSubject 添加了两个新的plunker 示例。关于使用WebSocketSubject,我只是从你那里知道的,它似乎是新的,还没有准备好。一旦准备好,它将是我的首选。
    【解决方案2】:

    从套接字获取 onMessage 数据。

    import { Injectable } from '@angular/core';
    import {Observable} from 'rxjs/Rx';
    
    
    @Injectable()
    
    export class HpmaDashboardService {
    private socketUrl: any = 'ws://127.0.0.0/util/test/dataserver/ws';
    private websocket: any;      
    
    public GetAllInstanceStatus(objStr): Observable<any> {
          this.websocket = new WebSocket(this.socketUrl);
          this.websocket.onopen =  (evt) => {
            this.websocket.send(JSON.stringify(objStr));
          };
          return Observable.create(observer => {
            this.websocket.onmessage = (evt) => {
              observer.next(evt);
            };
          }).map(res => res.data).share();
     }
    
    **Get only single mesage from socket.**
    
        public GetSingleInstanceStatus(objStr): Observable<any> {
          this.websocket = new WebSocket(this.socketUrl);
          this.websocket.onopen =  (evt) => {
            this.websocket.send(JSON.stringify(objStr));
          };
          return Observable.create(observer => {
            this.websocket.onmessage = (evt) => {
              observer.next(evt);
              this.websocket.close();
            };
          }).map(res => res.data).share();
      }
    
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-09-01
      • 2021-10-02
      • 2019-04-22
      • 1970-01-01
      • 2019-07-18
      • 2015-10-04
      • 1970-01-01
      • 2015-07-19
      相关资源
      最近更新 更多