【问题标题】:How can I return an observable with a value that's in a callback?如何返回一个带有回调值的 observable?
【发布时间】:2021-02-10 02:25:34
【问题描述】:

我正在编写一个服务,我打算存储 Place 对象的本地副本,并仅在它们未本地存储时从后端获取它们。但是,我在实现此功能时遇到了麻烦。如果来自place() 的值是undefined,我可以将我的页面设置为调用fetchPlace(),但我打算将fetchPlace() 保持私有,以便我以后可以实现一个系统来检查是否有请求是最近制作的,这样如果用户快速切换页面,服务器就不会被请求淹没。

places.service.ts

export class PlacesService {
  private _places = new BehaviorSubject<Place[]>([]);
  get places() {
    return this._places.asObservable();
  }

  constructor(private _http: HttpClient) {}

  place(placeId: number): Observable<Place> {
    return this._places.pipe(
      take(1),
      map((places: Place[]) => {
        console.log(places);
        let place = places.find((place: Place) => place.id === placeId);

        if (place === undefined) {
          console.log('Time to send a request!');
          this.fetchPlace(placeId).subscribe(
            (fetchedPlace: Place) => {
              console.log('We got one!');
              place = fetchedPlace;
              console.log(place);
            },
            (error) => {
              console.error('Looks like a 404.');
            },
          );
        }

        console.log('Okay, returning place now!');
        return place;
      }),
    );
  }

  private fetchPlace(placeId: number): Observable<Place> {
    return this._http
      .get<Place.ResponseBody>(`http://localhost:8000/v1/places/${placeId}/`)
      .pipe(map((response: Place.ResponseBody) => Place.create(response)));
  }
}

上面代码的问题是,当变量placeundefined时,对fetchPlace()的订阅被异步调用,所以placeplace的值之前返回被fetchedPlace 覆盖。我想要某种方式从 place() 函数返回包含 place 的 observable。

为了完整起见,这里是上面代码的调用方式,以及控制台输出:

place-detail.page.ts

ngOnInit() {
  this._route.paramMap.subscribe((paramMap: ParamMap) => {
    if (!paramMap.has('placeId')) {
      this._navCtrl.navigateBack('/places/discover');
      return;
    }

    const placeId = +paramMap.get('placeId');
    this._placesSub = this._placesSrv.place(placeId).subscribe(
      (place: Place) => {
        if (place === undefined) {
          console.log('Got here.');
        } else {
          this._isBookable = place.user !== this._authSrv.user;
          this._place = place;
        }
      },
      (error) => {
        console.error(error);
      }
    );
  });
}

控制台

Angular is running in development mode. Call enableProdMode() to enable production mode. core.js:26833
Native: tried calling StatusBar.styleDefault, but Cordova is not available. Make sure to include cordova.js or run in a device/simulator common.js:284
Native: tried calling SplashScreen.hide, but Cordova is not available. Make sure to include cordova.js or run in a device/simulator common.js:284
[WDS] Live Reloading enabled. client:52
Array []
places.service.ts:81:16
Time to send a request! places.service.ts:85:18
Okay, returning place now! places.service.ts:98:16
Got here. place-detail.page.ts:75:20
We got one! places.service.ts:88:22
Object { _id: 1, _user: 2, _title: "Manhattan Mansion", _description: "In the heart of New York City.", _imgUrl: "https://www.idesignarch.com/wp-content/uploads/New-York-Fifth-Avenue-Mansion_1.jpg", _price: "149.99", _availableFrom: Date Fri Dec 31 2021 18:00:00 GMT-0600 (Central Standard Time), _availableTo: Date Sat Dec 30 2023 18:00:00 GMT-0600 (Central Standard Time) }
places.service.ts:90:22

【问题讨论】:

    标签: angular rxjs rxjs-observables


    【解决方案1】:

    注意您是如何在可观察流中调用subscribe() 的?通常,我们希望避免这种情况,因为每次发出 observable 时,您都会创建一个新的内部订阅,并且没有真正的好方法来清理它们。

    您要查找的是“Higher Order Mapping Operator”,在本例中为 switchMap

    使用switchMap,您可以将传入的发射映射到 Observable。然后,switchMap 将订阅这个“内部 observable”并发出它的发射。当接收到新的发射时,之前的内部 observable 将被取消订阅并订阅新的。所以,本质上,它允许你“切换”一个可观察的源。

    在您的情况下,您有两个可能的来源,要么是现有项目 (place.find(...)),要么是新获取的结果 (this.fetchPlace(placeId))。

    由于 switchMap 中的代码必须返回一个 observable,所以返回 fetchPlace(placeId) 没问题,因为它返回一个 observable。但是,现有的项目不是可观察的,所以我们必须用of 包裹以将其变成一个。

    这是您的代码使用switchMap 的样子:

    place(placeId: number): Observable<Place> {
        return this._places.pipe(
          switchMap((places: Place[]) => {
            const place = places.find(place => place.id === placeId);
    
            return place ? of(place) : this.fetchPlace(placeId);
          }),
        );
      }
    

    另外,请注意我删除了take(1)。我想你不想要那个。原因如下:使用 observables 的主要优点是消费者总是可以被推送到最新的值。 take(1) 基本上只提供一个值。所以,如果你要实现你提到的缓存过期,如果组件 A 订阅 place(1) 并且它变得陈旧,然后组件 B 订阅导致重新获取,你希望组件 A 接收新获取的值,对吗?

    【讨论】:

      【解决方案2】:

      看起来您想要缓存 API 调用,或者在 RxJs 世界中 - 与未来的订阅者共享源 Observable(而不是再次订阅它)。这是一个干净的例子:

      service.ts:

      cache: { [key:string]: Observable<any> } = {};
      
      constructor(private http: HttpClient) {    }
      
      memoisedGet(url: string) {    
        const source = this.http.get(url).pipe( 
          publishReplay(1),
          refCount()
        );
        this.cache[url] = this.cache[url] || source;
      
        return this.cache[url];
      }  
      

      component.ts:

      memoisedGet(url: string = 'https://jsonplaceholder.typicode.com/todos/1') {
        this.myService.memoisedGet(url).subscribe(console.log);
      }
      

      有了这个,无论memoisedGet()被调用多少次,它都会触发一个HTTP调用,每次调用都会带来最后一个结果。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-10-07
        • 2021-02-05
        • 2017-11-01
        • 1970-01-01
        • 2019-09-18
        • 1970-01-01
        • 2016-11-27
        • 1970-01-01
        相关资源
        最近更新 更多