【问题标题】:Node js loop of promises to observable对可观察的 Promise 的 Node js 循环
【发布时间】:2020-12-15 18:12:21
【问题描述】:

我有一种方法可以对数据库进行大量异步查询以检索大量数据。为简单起见,假设每个查询都返回一个整数数组。我想把这个方法变成可观察的,并一一输出数字。这部分工作正常。

问题从“take”运算符开始 - 如果没有人在听结果,我想停止 DB 请求。我的问题是“滚动”函数在达到停止条件之前不会停止执行,即使“大查询承诺”由于 take(10) 运算符而不再监听它。

当订阅者由于各种原因取消订阅时,是否有可能停止 observable 的执行?

let ind = 0;
function dbRequest(): Promise<number[]> {
    return new Promise(resolve => resolve([ind++, ind++]));
}

async function largeQuery(index: number) {
    let res = await dbRequest();
    return new Observable(observer => scroll(observer, res, index));
}

function scroll(observer: Subscriber<number>, res: number[], index: number) {
    if (Math.round(Math.random() * 5) === 0) {
        console.log(`completed sequence ${index}`);
        observer.complete();
        return;
    }

    res.forEach(value => observer.next(value));
    dbRequest().then(arr => scroll(observer, arr, index));
}

async function largeQueryPromise(index: number) {
    console.log(`started sequence ${index}`);
    const obs = await largeQuery(index);
    obs.pipe(take(10)).subscribe(
        undefined, 
        console.error, 
        () => {
            console.log(`stopped to listen for sequence ${index}`);
            largeQueryPromise(++index).then();
        });
}

largeQueryPromise(0).then();

【问题讨论】:

    标签: node.js typescript async-await rxjs observable


    【解决方案1】:

    您的 largeQuery 只能使用运算符来完成。当上一个请求发出时,使用expand 递归调用dbRequest()。通过返回EMPTY 结束此递归。使用concatAll 传播传入的数组发出。

    function largeQuery(index: number): Observable<number> {
      console.log("largeQuery2 for", index);
      return from(dbRequest()).pipe(
        expand(res => {
          if (Math.round(Math.random() * 5) === 0) {
            console.log(`completed sequence ${index}`);
            return EMPTY;
          }
          // The observable returned here gets subscribed to before the 'take' operator
          // below ends the subscription. To prevent an additional call of 'dbRequest'
          // at the end, the observable returned here has to be asynchronous. 
          // That's why 'timer' is used. 
          // If this doesn't turn out to be an issue for you, the line below could be 
          // replace with 'return defer(() => dbRequest())' or even 'return from(dbRequest())'
          return timer(0).pipe(switchMap(() => dbRequest()));
        }),
        concatAll()
      );
    }
    
    function recursiveLargeQuery(index: number) {
      console.log(`started sequence ${index}`);
      largeQuery(index).pipe(
        take(10),
      ).subscribe(
        v => console.log(v), 
        console.error, 
        () => {
          console.log(`stopped to listen for sequence ${index}`);
          if (index < 2) { // end the recursion at some point
            recursiveLargeQuery(++index);
          }
        });
    }
    
    recursiveLargeQuery(0)
    

    https://stackblitz.com/edit/rxjs-ihxkax?file=index.ts

    【讨论】:

      【解决方案2】:

      Observer 有一个参数“close”,表示该订阅者是否已取消订阅。知道这一点,解决方案就很简单了:

      function scroll(observer: Subscriber<number>, res: number[], index: number) {
          if (Math.round(Math.random() * 5) === 0) {
              console.log(`completed sequence ${index}`);
              observer.complete();
              return;
          }
      
          for(let i=0; i<res.length && !observer.closed; i++)
              observer.next(res[i]);
      
          if(!observer.closed)
              dbRequest().then(arr => scroll(observer, arr, index));
      }
      

      编辑:请注意,从技术上讲,您不需要在 for 循环中进行检查 - 所有 .next 都将是一个 noop。

      【讨论】:

      • 谢谢,这解决了我的问题。虽然我期待类似事件的事情,但我可以对此做出反应。
      • 如果你在 observable 构造函数中返回一个函数,当你需要清理时该函数将被调用(例如new Observable(obs =&gt; { obs.next(1); return () =&gt; /* .. cleanup .. */; }))。请注意,由于它是一个返回函数,您仍然需要在上面的同步块中检查observer.closed。
      猜你喜欢
      • 2018-01-29
      • 2014-08-07
      • 2018-11-13
      • 2019-06-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多