【发布时间】:2021-08-02 12:03:09
【问题描述】:
如果具有相同密钥的消费者已经订阅,则有一个共享订阅的 SSE 端点。如果有活动订阅,则正在从另一个客户端轮询数据。
问题在于外部订阅似乎永远不会捕获错误并将其委托给路由器以关闭与客户端的连接:轮询停止,但连接保持活动状态。
我认为问题在于我如何开始要共享的订阅...但我目前想不出另一种方法来解决这个问题。
路由器(SSE)/外部订阅:
...
const clientId = Date.now();
const newClient = {
id: clientId,
res,
};
clients.push(newClient);
const sub = subscriptionService.listenToChanges(req.context, categoryIds).subscribe({
next: (data) => {
if (JSON.stringify(data) !== '{}') {
newClient.res.write(`data: ${JSON.stringify(data)}\n\n`);
} else {
newClient.res.write(': poke...\n\n');
}
},
error: () => {
// we never get here...
next(new InternalError());
clients = clients.filter((c) => c.id !== clientId);
res.end();
},
complete: () => {
res.end();
clients = clients.filter((c) => c.id !== clientId);
},
});
req.on('close', () => {
subscriptionService.stopListening(req.context);
sub.unsubscribe();
clients = clients.filter((c) => c.id !== clientId);
});
...
订阅服务
...
@trace()
public listenToChanges(ctx: Context, ids: string[]): Observable<{ [key: string]: Data }> {
const key = ctx.user?.email || ClientTypeKey.Anon;
try {
if (this.pool$[key]) {
return this.pool$[key];
}
this.poolSource[key] = new BehaviorSubject<{ [p: string]: Data }>({});
this.pool$[key] = this.poolSource[key].asObservable();
this.fetchData(ctx, ids);
return this.pool$[key].pipe(
catchError((e) => throwError(e)), // we never get here...
);
} catch (e) {
throw new Error(`Subscription Service Listen returned an error: "${e}"`);
}
}
...
private fetchData(ctx: Context, ids: string[]): void {
const key = ctx.user?.email || ClientTypeKey.Anon;
const sub = this.service.getData(ctx, ids)
.pipe(
catchError((e) => throwError(e)),
).subscribe(
(r) => this.poolSource[key].next(r),
(e) => throwError(e), // last time the error is caught
);
this.subscriptions[key] = sub;
}
...
投票服务
...
@trace()
public getData(ctx: Context, ids: string[]): Observable<{[key: string]: Data}> {
try {
const key = ctx.user?.email || ClientTypeKey.Anon;
const pollingInterval = config.get('services.pollingInterval') || 10000;
return interval(pollingInterval).pipe(
startWith(0),
switchMap(() => this.getConfig(ctx, !!this.cachedData[key])),
map((r) => this.getUpdatedData(ctx, r.data, ids)),
catchError((e) => throwError(e)),
);
} catch (e) {
throw new Error(`Get Data returned an error: "${e}"`);
}
}
...
【问题讨论】:
标签: javascript node.js typescript rxjs