【问题标题】:RxJS interval, AJAX request and reduceRxJS 间隔,AJAX 请求和减少
【发布时间】:2019-08-12 16:44:41
【问题描述】:

我试图弄清楚如何在映射数据并使用 RxJS 运算符减少数据时每五秒发送一次 AJAX 请求。

因此,有一个简单的 Angular 服务为目前在太空中的宇航员请求外部 API:http://api.open-notify.org/astros.json

在服务中,我尝试先设置interval(5000),然后使用RxJS映射运算符将传入的号码映射到GET请求。

任务是这样的:我需要先找到宇航员,然后找到在 ISS 上飞行的人,然后再找到可以渲染成视图的宇航员。数据必须每 5 秒更新一次;所以我需要每 5 秒重新发送一次相同的 HTTP 请求,我可以使用 setInterval() 来完成。这不是解决问题的最干净的方法。

代码如下:

import { Injectable } from '@angular/core';
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { Observable, throwError, interval } from 'rxjs';
import { catchError, filter, concatMap, flatMap, map, reduce } from 'rxjs/operators';
import { Astronaut } from '../models/astronaut.model';

const astronautsUri = 'http://api.open-notify.org/astros.json';

@Injectable({
  providedIn: 'root'
})
export class IssLocatorService {
  constructor(private http: HttpClient) {}

  getAstronauts(): Observable<Astronaut[]> {
    return interval(5000).pipe(
      concatMap(num => this.http.get(astronautsUri)),
      flatMap((newAstronauts: any) => newAstronauts.people),
      filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
      map((astronaut: Astronaut) => [astronaut]),
      reduce((prev, next) => [...prev, ...next]),
      catchError(this.handleError)
    );
  }
}

代码不起作用,唉。 尽管流确实到达了reduce() 运算符,但该运算符不会返回到组件中的subscribe() 方法。

不过,在我看来,该解决方案应该可以正常工作。以下是我认为它的工作原理:

  1. 5 秒后,第一个数字 0 生成,外部 Observable 映射到内部 Observable - AJAX 请求。 concatMap() 一直等到内部 Observable 完成,然后才获得第二个数字 - 1。
  2. 内部 Observable 是一个返回 JSON 的 HTTP 请求。我只需要与此类似的对象中的人:{success:true, people: [...]}),然后我使用flatMap() 将此对象转换为宇航员数组people。由于flatMap() 的工作方式,people 中的每个对象都将成为 Observable。
  3. 我过滤每个宇航员对象。
  4. 我将每个宇航员对象映射到要减少的数组。
  5. 我减少了宇航员数组。换句话说,astronauts.people 得到了复制,这要归功于 reduce()
  6. (这里有问题)reduce(),根据规范,应该返回subscribe,因为内部的Observable已经完成。 但它没有reduce() 等到下一个数字由 interval() 生成并映射到内部 Observable,再次返回 people 并推送到同一个数组。它会一直持续下去。

如果我将reduce() 替换为scan,则数组或宇航员确实返回到subscribe 方法。然而,由于宇航员物体被反复推入其中,这个数组不断变大。

以下方法效果很好:

return this.http.get(astronautsUri).pipe(
  flatMap((newAstronauts: any) => newAstronauts.people),
  filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
  map((astronaut: Astronaut) => [astronaut]),
  reduce((prev, next) => [...prev, ...next]),
  catchError(this.handleError)
);

但是在这种情况下,我必须在渲染宇航员的组件类中手动设置setInterval()的间隔,并且我必须调用getAstronauts()方法。所以,ngOnInit中的方法基本上有两次调用。

如何仅使用 RxJS 运算符来达到预期的效果?我想设置一个区间,映射和过滤并减少对象数组,然后接收它们。

我对 RxJS 映射如何工作的理解非常糟糕,但我尝试了(为了尝试)所有这些方法 - switchMap()concatMap()exhaustMap()flatMap() - 映射来自 @987654351 的数字@ 到 AJAX 请求。还是不行。

【问题讨论】:

    标签: angular rxjs


    【解决方案1】:

    我认为你想要实现的是

    getAstronauts(): Observable<Astronaut[]> {
        return interval(5000).pipe(
          concatMap(num => 
            this.http.get(astronautsUri).pipe(
              flatMap((newAstronauts: any) => newAstronauts.people),
              filter((astronaut: Astronaut) => astronaut.craft === 'ISS'),
              reduce((prev, astronaut) => [...prev, astronaut], []),
            )
          ),
          catchError(this.handleError)
        );
      }
    

    reduce 方法的问题是它在发出任何值之前等待其源 observable 完成。在我的代码源中是一个请求的元素,而有问题的示例源是来自所有请求的所有元素,永远不会结束

    【讨论】:

    • 感谢您的回复。这可能是很多人不喜欢的——嵌套管道。尽管没有其他方法,但我认为。它工作正常,所以再次感谢。
    • 可以在windowTime 运算符的帮助下实现,因为您知道所有这些数组项都将通过 5000 区间。您将获得 1 级嵌套,但如果某些请求需要太长时间才能完成,它可能会有点错误
    【解决方案2】:

    将reduce改为scan,scan与reduce做同样的事情,但在每次迭代时发出,reduce只在整个流完成时发出。

    const { from } = rxjs;
    const { scan } = rxjs.operators;
    
    const obs$ = from([1,2,3,4,5]);
    
    obs$.pipe(scan((total, item) => total + item)).subscribe(val => { console.log(val); });
    &lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"&gt;&lt;/script&gt;

    const { from } = rxjs;
    const { reduce } = rxjs.operators;
    
    const obs$ = from([1,2,3,4,5]);
    
    obs$.pipe(reduce((total, item) => total + item)).subscribe(val => { console.log(val); });
    &lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"&gt;&lt;/script&gt;

    【讨论】:

    • 这不起作用。 scan() 确实返回中间值,但数组一直在变大,因为相同的宇航员对象被推入其中。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-11-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-03-26
    相关资源
    最近更新 更多