【问题标题】:Deserializing Blob object from WebSocketSubject从 WebSocketSubject 反序列化 Blob 对象
【发布时间】:2020-10-29 09:04:19
【问题描述】:

我正在尝试使用rxjs 中的内置WebSocketSubject 类。我已经将大部分基本代码设置为 Angular 服务,如下所示:

import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Subscription, from, Observable } from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class WebsocketsApiService {
  connection: WebSocketSubject<any>;

  constructor() { 
    this.connection = webSocket({
      url: <websocket URL>,
      closeObserver: {
        next: () => console.log('Sockets disconnected.')
      },
      openObserver: {
        next: () => console.log('Socket connected.')
      }
    });
  }

  connect(observer: (value: any) => void): Subscription {
    return this.connection.subscribe(observer);
  }

  send(data: any) {
    if (this.connection) {
      this.connection.next(data);
    } else {
      console.error('Websockets not connected');
    }
  }

  close() {
    if (this.connection) {
      this.connection.complete();
      this.connection = null;
    }
  }

  ngOnDestroy() {
    this.close();
  }
}

但是,使用此基本代码,它无法反序列化来自 websocket 连接的任何数据。调试后我注意到它以Blob 的形式接收数据,这就是默认解串器函数JSON.parse(data) 失败的原因。

我尝试通过将Blob 数据作为字符串来滚动一个简单的自定义反序列化器函数,但事实证明Blob.text() 是一个Promise,这是我卡住的部分:

constructor() { 
    this.connection = webSocket({
      /// ...
      deserializer: (e) => e.data.text()
    });
}

因此,使用此函数作为反序列化器,订阅此 Websocket 服务的每个组件都将获得一个字符串的 Promise,这会将解析 Promise 的责任推迟给组件。

问题:

  1. 目前的方法是否理想?让每个订阅组件自己解决承诺?
  2. 是否有更简洁的方法,最好是让 websocket 订阅者立即解析 JSON 对象,而不是订阅者必须单独解析字符串?

【问题讨论】:

    标签: angular websocket rxjs


    【解决方案1】:

    您可以使用mergeAll/mergeMapconcatAll/concatMapswitchAll/switchMap 映射到内部 Promise 的值。您可能希望concatAll/concatMap 按它们的进入顺序发出值。

    constructor() { 
      this.connection = webSocket({
        /// ...
        deserializer: (e) => e.data.text()
      });
    }
    
    connect(observer: (value: any) => void): Subscription {
      return this.connection.pipe(
        concatAll()
      ).subscribe(observer);
    }
    

    或者

    constructor() { 
      this.connection = webSocket({
        /// ...
        deserializer: ({data}) => data
      });
    }
    
    connect(observer: (value: any) => void): Subscription {
      return this.connection.pipe(
        concatMap(data => data.text())
      ).subscribe(observer);
    }
    

    【讨论】:

      猜你喜欢
      • 2013-01-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-04-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多