【问题标题】:How do I use RxJS & Observables to do an http request only after multiple other events have fired?仅在多个其他事件触发后,如何使用 RxJS 和 Observables 执行 http 请求?
【发布时间】:2020-11-23 19:16:00
【问题描述】:

我无法将两个不同 Observables 的发出值组合起来用于 http 请求,然后返回请求的 Observable 以供客户端使用。

一些背景知识:我正在开发 Twitch 扩展程序。他们的生态系统的一部分是扩展通过事件回调接收环境信息。我感兴趣的位于window.Twitch.ext.onAuthorized()window.Twitch.ext.configuration.onChanged()(如果有兴趣,请参阅此处了解更多详情:https://dev.twitch.tv/docs/extensions/reference#helper-extensions)。

在调用我的后端时,我需要来自上述两个事件的信息。这些不会经常更改(如果有的话),但是在它们都可用之前我无法拨打电话,并且我希望在拨打电话时获得最近提供的值。看起来BehaviorSubjects 很适合这个:

export class TwitchService {
  private authSubject: BehaviorSubject<TwitchAuth> = new BehaviorSubject<TwitchAuth>(null);
  private configSubject: BehaviorSubject<TwitchConfig> = new BehaviorSubject<TwitchConfig>(null);
  private helper: any;

  constructor(private window: Window) {
    this.helper = this.window['Twitch'].ext;
    this.helper.onAuthorized((auth: TwitchAuth) => {
      this.authSubject.next(auth);
    });
    this.helper.configuration.onChanged(() => {
      this.configSubject.next(JSON.parse(this.helper.configuration.global.content));
    });
  }

  onAuthorized(): Observable<TwitchAuth> {
    return this.authSubject.asObservable().pipe(filter((auth) => !!auth));
  }

  onConfig(): Observable<TwitchConfig> {
    return this.configSubject.asObservable().pipe(filter((config) => !!config));
  }
}

此模型适用于订阅这两个Observables 之一的应用程序部分。我的问题是我找不到一种可行的方法来组合它们并使用两者中最新发出的值来为 http 请求创建一次性 Observable

这是我目前所拥有的:

type twitchStateToObservable<T> = (auth: TwitchAuth, config: TwitchConfig) => Observable<T>;

export class BackendService {
  constructor(private http: HttpClient, private twitch: TwitchService) {}

  private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
    let subject = new Subject<T>();
    combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
      first(), // because we only want to make the http request one time
      map(([auth, config]) => f(auth, config).subscribe((resp) => subject.next(resp)))
    );
    return subject.asObservable();
  }

  exampleHttpRequest(): Observable<Response> {
    return this.httpWithTwitchState((auth, config) =>
      // this.url() and this.headers() are private functions of this service
      this.http.get<Response>(this.url(config) + 'exampleHttpRequest', { headers: this.headers(auth, config)})
    );
  }
}

然后,该服务的客户端应该能够通过这个简单的调用发出 http 请求,而无需了解有关事件的任何信息或在它们触发时关心:

this.backend.exampleHttpRequest().subscribe((resp) => {
  // do stuff with resp here...but this is never hit
}

根据我的理解,只要输入 Observables 中的任何一个发出新值,combineLatest() 就应该发出新值。但是,在我的应用程序中从未触发map() 内部的调用f(auth, config)。我已经使用断点、console.logs 并密切关注浏览器调试器工具中的“网络”选项卡对其进行了测试。对first() 的调用可能会将其丢弃,但如果事件再次触发,我不想重复 http 请求,原因很明显。

提前感谢您的任何建议或指点!

【问题讨论】:

  • 尽管 RxJS 有一些非惯用用法,但它看起来应该可以工作。你确定helper.onAuthorizedhelper.configuration.onChanged 回调实际上是在TwitchService 中触发的。我可以想象一个例子,助手在 TwitchService 被实例化之前发出了它的事件。
  • @amakhrov 好主意 - 我用一些断点对其进行了测试。 TwitchService 构造函数首先被击中,然后httpWithTwitchState() 被击中(因为另一个组件正在其ngOnInit() 中进行http 调用),然后两个事件都在TwitchService 中触发。这符合我预期的事件顺序。你有什么我能读到的关于 RxJS 的惯用用法吗?也许如果我清理它,问题就会自行解决。

标签: angular typescript rxjs twitch rxjs-observables


【解决方案1】:

好吧,事实证明我遗漏了一些明显的东西。我没有订阅combineLatest() 调用,所以它从未被执行。在最后添加subscribe() 后,效果很好:

private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
  let subject = new Subject<T>();
  combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
      first(),
      map(([auth, config]) => f(auth, config).subscribe((resp) => subject.next(resp)))
  ).subscribe(); // adding this fixed the problem
  return subject.asObservable();
} 

在那之后,我对如何在这种情况下取​​消嵌套订阅进行了更多研究,这使我找到了flatMap() 运算符。所以现在httpWithTwitchState() 函数看起来像这样:

private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
  return combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
    first(),
    map(([auth, config]) => f(auth, config)),
    flatMap((resp) => resp)
  );
}

现在按预期工作并且更清洁。感谢那些花时间阅读我的问题和/或回复的人。

【讨论】:

  • 好收获!顺便说一句,这是“非惯用 rxjs 代码”的一部分:)。无需在 httpWithTwitchState 中创建新主题。此外,不需要在 map() 操作符内部订阅(这实际上很糟糕 - map 是为数据转换而设计的,而不是为了副作用。
  • 相反,这个方法看起来像:return combineLatest([onAuth, onConfig]).pipe(first(), switchMap(([auth, conf]) =&gt; f(auth, conf)))
【解决方案2】:

你可以把 c​​oncat 想象成 ATM 机上的一条线,下一个交易(订阅)在上一个交易完成之前无法开始!

例如:https://stackblitz.com/edit/typescript-3qsfgk?file=index.ts&devtoolsheight=100

示例:

const mergeValue = concat(authSubject,configSubject) 

mergeValue.subscribe( (x) => {
  // do stuff with resp here
});

【讨论】:

  • 嘿,感谢您的回答 - 但我认为 concat 不适合我的用例。我正在尝试协调两个事件的输出,当它们触发时我无法控制它们,并且只有在它们都触发后才使用每个事件的数据进行 http 调用。
猜你喜欢
  • 1970-01-01
  • 2020-03-26
  • 2021-01-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-05-03
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多