【问题标题】:RXJS expand operatorRXJS 扩展运算符
【发布时间】:2017-07-04 03:29:29
【问题描述】:
var offset = 1;
var limit = 500;

var list = new Promise(function (resolve, reject) {
  rets.getAutoLogoutClient(config.clientSettings, (client) => {
    var results = client.search.query(SearchType, Class, Query, {
      limit: limit,
      offset: offset
    });
    resolve(results);
  });
});

var source = Rx.Observable.fromPromise(list);

source.subscribe(results => console.log(results.count));

我正在做一个房地产网站,使用 RETS。

我试图做的查询受到 RETS 服务器的限制,在循环中运行它,增加我的偏移量,直到我拥有所有数据。在我运行查询并找到计数值之前,我不知道计数是多少。

我曾尝试使用 expand,但我不知道它是如何工作的。尝试执行这些多种方式,甚至使用老式的 while 循环,该循环不适用于 .then 方法。因此,自从我在 Angular 4 中使用 RXJS 后,我就转向了它。

这是通过快递完成的。我最终需要运行玉米作业以获取更新的属性,但我的问题是每次获取所有数据并增加偏移量,如果计数高于我的偏移量。例如,运行一个偏移量为 1 且限制为 500 的查询。这里的总数是 1690。所以接下来我的偏移量将是:

offset += limit

获得数据后,我需要将其保存到 MongoDB。我已经能够成功地做到这一点。它只是找到一种无需手动设置偏移量即可获取所有数据的方法。

请注意,服务器限制为 2500,是的,我可以一次性获取所有这些,但还有其他数据,例如媒体,可能超过 2500。

有什么建议吗?

【问题讨论】:

    标签: javascript rxjs rets


    【解决方案1】:

    这实际上是 RxJS 的一个相当常见的用例,因为有很多分页数据源,或者在其他方面限制了您一次可以请求的数据源。

    我的两分钱

    在我看来,expand 可能是最好的运算符,因为您要针对未知数据源进行分页,并且至少需要一个查询才能确定最终计数。如果您知道要查询多少数据,更简单的选择是使用 mergeScan 之类的东西,但我离题了。

    建议的解决方案

    这可能需要您花点力气才能理解,所以我尽可能添加注释来分解这一切是如何工作的。 注意我还没有实际测试过,所以请原谅我的任何语法错误。

    // Your constant limit for any one query
    const limit = 500;
    
    // RxJS helper method that wraps the async call into an Observable
    // I am basing this on what I saw of your sample which leads me to believe
    // that this should work. 
    const clientish = Rx.Observable.bindCallback(rets.getAutoLogoutClient);
    
    // A method wrapper around your query call that wraps the resulting promise
    // into a defer.
    const queryish = (client, params) =>
      // Note the use of defer here is deliberate, since the query returns
      // a promise that will begin executing immediately, this prevents that behavior
      // And forces execution on subscription.
      Rx.Observable.defer(() => client.search.query(SearchType, Class, Query, params));
    
    // This does the actual expansion function
    // Note this is a higher order function because the client and the parameters
    // are available at different times
    const expander = (client) => ({limit, count}) => 
      // Invoke the query method
      queryish(client, {limit, count})
        // Remap the results, update offset and count and forward the whole 
        // package down stream
        .map(results => ({
          limit, 
          count: results.count, 
          offset: offset + limit, 
          results
        }));
    
    
    // Start the stream by constructing the client
    clientish(config.clientSettings)
      .switchMap(client =>
         // This are the arguments for the initial call
         Rx.Observable.of({limit, offset: 0})
           // Call the expander function with the client
           // The second argument is the max concurrency, you can change that if needed
           .expand(expander(client), 1)
    
           // Expand will keep recursing unless you tell it to stop
           // This will halt the execution once offset exceeds count, i.e. you have
           // all the data
           .takeWhile(({count, offset}) => offset < count)
    
           // Further downstream you only care about the results
           // So extract them from the message body and only forward them
           .pluck('results')
      )
      .subscribe(results => /*Do stuff with results*/);
    

    【讨论】:

    • 这不起作用。 switchMap 不是函数。 takeWhile 有语法错误,rets 客户端的东西需要更好地结合起来。
    • @JoshuaScott 您使用的是哪个版本的 RxJS?
    • 我正在使用 RXJS 5
    【解决方案2】:
    const retsConnect = Rx.Observable.create(function(observer) {
      rets.getAutoLogoutClient(config.clientSettings, client => {
        return searchQuery(client, 500, 1, observer);
      });
    });
    
    function searchQuery(client, limit, offset, observer) {
      let currentOffset = offset === undefined || offset === 0 ? 1 : offset;
      return client.search.query(SearchType, Class, Query, {limit: limit, offset: currentOffset})
        .then(results => {
          offset += limit;
          observer.next(results.maxRowsExceeded);
          if (results.maxRowsExceeded) {
            console.log(offset);
            return searchQuery(client, limit, offset, observer);
          } else {
            console.log('Completed');
            observer.complete();
          }
        });
    }
    
    retsConnect.subscribe(val => console.log(val));
    

    所以这与我在这里尝试过的东西有所不同。我仍在调整这个。所以我想做的是更多地分解searchQuery。不知道我是否应该在那里传递observer.next,所以我要弄清楚在哪里映射并再次安装返回searchQuery。我不确定 takeUntil 会采用真假。我需要做的就是将这些数据保存到 mongodb 中。所以我想我可以像这样离开它并将我的保存方法放在那里,但我仍然想弄清楚这一点。

    注意:当还有更多数据时,results.maxRowsExceeded 返回 true。所以一旦 maxRows 返回 false,它就会停止并且所有的数据都已经被获取。

    【讨论】:

      猜你喜欢
      • 2022-01-03
      • 1970-01-01
      • 2017-12-20
      • 2016-09-20
      • 2020-04-13
      • 2018-07-29
      • 2019-02-25
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多