【发布时间】:2020-11-23 19:16:00
【问题描述】:
我无法将两个不同 Observables 的发出值组合起来用于 http 请求,然后返回请求的 Observable 以供客户端使用。
一些背景知识:我正在开发 Twitch 扩展程序。他们的生态系统的一部分是扩展通过事件回调接收环境信息。我感兴趣的位于window.Twitch.ext.onAuthorized()和window.Twitch.ext.configuration.onChanged()(如果有兴趣,请参阅此处了解更多详情:https://dev.twitch.tv/docs/extensions/reference#helper-extensions)。
在调用我的后端时,我需要来自上述两个事件的信息。这些不会经常更改(如果有的话),但是在它们都可用之前我无法拨打电话,并且我希望在拨打电话时获得最近提供的值。看起来BehaviorSubjects 很适合这个:
export class TwitchService {
private authSubject: BehaviorSubject<TwitchAuth> = new BehaviorSubject<TwitchAuth>(null);
private configSubject: BehaviorSubject<TwitchConfig> = new BehaviorSubject<TwitchConfig>(null);
private helper: any;
constructor(private window: Window) {
this.helper = this.window['Twitch'].ext;
this.helper.onAuthorized((auth: TwitchAuth) => {
this.authSubject.next(auth);
});
this.helper.configuration.onChanged(() => {
this.configSubject.next(JSON.parse(this.helper.configuration.global.content));
});
}
onAuthorized(): Observable<TwitchAuth> {
return this.authSubject.asObservable().pipe(filter((auth) => !!auth));
}
onConfig(): Observable<TwitchConfig> {
return this.configSubject.asObservable().pipe(filter((config) => !!config));
}
}
此模型适用于订阅这两个Observables 之一的应用程序部分。我的问题是我找不到一种可行的方法来组合它们并使用两者中最新发出的值来为 http 请求创建一次性 Observable。
这是我目前所拥有的:
type twitchStateToObservable<T> = (auth: TwitchAuth, config: TwitchConfig) => Observable<T>;
export class BackendService {
constructor(private http: HttpClient, private twitch: TwitchService) {}
private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
let subject = new Subject<T>();
combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
first(), // because we only want to make the http request one time
map(([auth, config]) => f(auth, config).subscribe((resp) => subject.next(resp)))
);
return subject.asObservable();
}
exampleHttpRequest(): Observable<Response> {
return this.httpWithTwitchState((auth, config) =>
// this.url() and this.headers() are private functions of this service
this.http.get<Response>(this.url(config) + 'exampleHttpRequest', { headers: this.headers(auth, config)})
);
}
}
然后,该服务的客户端应该能够通过这个简单的调用发出 http 请求,而无需了解有关事件的任何信息或在它们触发时关心:
this.backend.exampleHttpRequest().subscribe((resp) => {
// do stuff with resp here...but this is never hit
}
根据我的理解,只要输入 Observables 中的任何一个发出新值,combineLatest() 就应该发出新值。但是,在我的应用程序中从未触发map() 内部的调用f(auth, config)。我已经使用断点、console.logs 并密切关注浏览器调试器工具中的“网络”选项卡对其进行了测试。对first() 的调用可能会将其丢弃,但如果事件再次触发,我不想重复 http 请求,原因很明显。
提前感谢您的任何建议或指点!
【问题讨论】:
-
尽管 RxJS 有一些非惯用用法,但它看起来应该可以工作。你确定
helper.onAuthorized和helper.configuration.onChanged回调实际上是在TwitchService中触发的。我可以想象一个例子,助手在 TwitchService 被实例化之前发出了它的事件。 -
@amakhrov 好主意 - 我用一些断点对其进行了测试。
TwitchService构造函数首先被击中,然后httpWithTwitchState()被击中(因为另一个组件正在其ngOnInit()中进行http 调用),然后两个事件都在TwitchService中触发。这符合我预期的事件顺序。你有什么我能读到的关于 RxJS 的惯用用法吗?也许如果我清理它,问题就会自行解决。
标签: angular typescript rxjs twitch rxjs-observables