【问题标题】:Node JS SSE: RxJs observable error gets ignored while sharing a subscriptionNode JS SSE:共享订阅时忽略 RxJs 可观察到的错误
【发布时间】:2021-08-02 12:03:09
【问题描述】:

如果具有相同密钥的消费者已经订阅,则有一个共享订阅的 SSE 端点。如果有活动订阅,则正在从另一个客户端轮询数据。

问题在于外部订阅似乎永远不会捕获错误并将其委托给路由器以关闭与客户端的连接:轮询停止,但连接保持活动状态。

我认为问题在于我如何开始要共享的订阅...但我目前想不出另一种方法来解决这个问题。

路由器(SSE)/外部订阅:

...

const clientId = Date.now();
const newClient = {
    id: clientId,
    res,
};
clients.push(newClient);

const sub = subscriptionService.listenToChanges(req.context, categoryIds).subscribe({
    next: (data) => {
        if (JSON.stringify(data) !== '{}') {
            newClient.res.write(`data: ${JSON.stringify(data)}\n\n`);
        } else {
            newClient.res.write(': poke...\n\n');
        }
    },
    error: () => {
        // we never get here...
        next(new InternalError());
        clients = clients.filter((c) => c.id !== clientId);
        res.end();
    },
    complete: () => {
        res.end();
        clients = clients.filter((c) => c.id !== clientId);
    },
});

req.on('close', () => {
    subscriptionService.stopListening(req.context);
    sub.unsubscribe();
    clients = clients.filter((c) => c.id !== clientId);
});

...

订阅服务

...

@trace()
public listenToChanges(ctx: Context, ids: string[]): Observable<{ [key: string]: Data }> {
    const key = ctx.user?.email || ClientTypeKey.Anon;
    try {
        if (this.pool$[key]) {
            return this.pool$[key];
        }


        this.poolSource[key] = new BehaviorSubject<{ [p: string]: Data }>({});
        this.pool$[key] = this.poolSource[key].asObservable();

        this.fetchData(ctx, ids);

        return this.pool$[key].pipe(
            catchError((e) => throwError(e)), // we never get here...
        );
    } catch (e) {
        throw new Error(`Subscription Service Listen returned an error: "${e}"`);
    }
}

...

private fetchData(ctx: Context, ids: string[]): void {
    const key = ctx.user?.email || ClientTypeKey.Anon;

    const sub = this.service.getData(ctx, ids)
        .pipe(
            catchError((e) => throwError(e)),
        ).subscribe(
            (r) => this.poolSource[key].next(r),
            (e) => throwError(e), // last time the error is caught
        );
    this.subscriptions[key] = sub;
}
...

投票服务

...

@trace()
public getData(ctx: Context, ids: string[]): Observable<{[key: string]: Data}> {
    try {
        const key = ctx.user?.email || ClientTypeKey.Anon;
        const pollingInterval = config.get('services.pollingInterval') || 10000;

        return interval(pollingInterval).pipe(
            startWith(0),
            switchMap(() => this.getConfig(ctx, !!this.cachedData[key])),
            map((r) => this.getUpdatedData(ctx, r.data, ids)),
            catchError((e) => throwError(e)),
        );
    } catch (e) {
        throw new Error(`Get Data returned an error: "${e}"`);
    }
}
...

【问题讨论】:

    标签: javascript node.js typescript rxjs


    【解决方案1】:

    throwError 实际上不会抛出错误,而是创建一个发出错误的 observable。

    来自文档:

    [throwError] 创建一个 observable,它将创建一个错误实例,并在订阅后立即将其作为错误推送给消费者。

    这就是为什么在 subscribe 中使用它不能按预期工作的原因。你应该简单地抛出:

    .subscribe(
      (r) => this.poolSource[key].next(r),
      (e) => throw new Error(e)
    );
    

    您调用fetchData() 的方式似乎有一些不必要的复杂性,以便订阅并将结果推送到BehaviorSubject。我不知道您的所有要求,但您似乎根本不需要BehaviorSubject

    您可以简单地返回 observable 并将其添加到您的 pool$ 数组中,而不是订阅 fetchData(),或者甚至完全摆脱 fetchData()

    public listenToChanges(ctx: Context, ids: string[]): Observable<{ [key: string]: Data }> {
        const key = ctx.user?.email || ClientTypeKey.Anon;
        try {
            if (!this.pool$[key]) {
                this.pool$[key] = this.service.getData(ctx, ids).pipe(
                    catchError((e) => throwError(e))
                );
            }
    
            return this.pool$[key];
        } catch (e) {
            throw new Error(`Subscription Service Listen returned an error: "${e}"`);
        }
    }
    

    注意事项:

    • 有了上面的简化,也许你不再需要外层的try/catch了
    • 这不是一个完整的解决方案,可能需要在代码的其他地方进行一些调整。我只是想指出,这似乎是不必要的复杂性。

    【讨论】:

    • 感谢您的回答,但我已经尝试过抛出错误的方法,它为我关闭了整个 nodejs 后端。不知何故,它似​​乎被视为一个未捕获的错误,尽管我确实抓住了它。可观察部分非常有趣,但它不允许我以我想要的方式在客户端之间共享订阅。当我们有两个相同键的侦听器并且其中一个取消订阅时 -> 另一个也取消订阅,但他应该继续积极获取更新。看来我无法从 rxjs 按照您建议的方式实现它的有效订阅数。
    猜你喜欢
    • 2018-04-29
    • 2018-07-28
    • 2021-06-17
    • 1970-01-01
    • 2017-04-13
    • 2018-03-24
    • 2018-06-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多