【发布时间】:2020-12-15 18:12:21
【问题描述】:
我有一种方法可以对数据库进行大量异步查询以检索大量数据。为简单起见,假设每个查询都返回一个整数数组。我想把这个方法变成可观察的,并一一输出数字。这部分工作正常。
问题从“take”运算符开始 - 如果没有人在听结果,我想停止 DB 请求。我的问题是“滚动”函数在达到停止条件之前不会停止执行,即使“大查询承诺”由于 take(10) 运算符而不再监听它。
当订阅者由于各种原因取消订阅时,是否有可能停止 observable 的执行?
let ind = 0;
function dbRequest(): Promise<number[]> {
return new Promise(resolve => resolve([ind++, ind++]));
}
async function largeQuery(index: number) {
let res = await dbRequest();
return new Observable(observer => scroll(observer, res, index));
}
function scroll(observer: Subscriber<number>, res: number[], index: number) {
if (Math.round(Math.random() * 5) === 0) {
console.log(`completed sequence ${index}`);
observer.complete();
return;
}
res.forEach(value => observer.next(value));
dbRequest().then(arr => scroll(observer, arr, index));
}
async function largeQueryPromise(index: number) {
console.log(`started sequence ${index}`);
const obs = await largeQuery(index);
obs.pipe(take(10)).subscribe(
undefined,
console.error,
() => {
console.log(`stopped to listen for sequence ${index}`);
largeQueryPromise(++index).then();
});
}
largeQueryPromise(0).then();
【问题讨论】:
标签: node.js typescript async-await rxjs observable